This is an automated email from the ASF dual-hosted git repository.

sdd pushed a commit to branch feature/table-scan-delete-file-handling
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git

commit a258d4a0837df1c0940270fc71a643b0870bcc63
Author: Scott Donnelly <sc...@donnel.ly>
AuthorDate: Wed Sep 25 07:57:28 2024 +0100

    feat: add delete_file_index and populate in table scan
---
 crates/iceberg/src/arrow/reader.rs                 |  20 +-
 crates/iceberg/src/delete_file_index.rs            | 110 ++++++++
 crates/iceberg/src/expr/predicate.rs               |   6 +
 crates/iceberg/src/lib.rs                          |   2 +
 crates/iceberg/src/scan.rs                         | 276 +++++++++++++++++----
 .../testdata/example_table_metadata_v2.json        |  16 +-
 6 files changed, 372 insertions(+), 58 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index b4e15821..7e6df878 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -47,8 +47,12 @@ use 
crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
 use 
crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
 use crate::expr::{BoundPredicate, BoundReference};
 use crate::io::{FileIO, FileMetadata, FileRead};
-use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
-use crate::spec::{Datum, PrimitiveType, Schema};
+use crate::scan::{
+    ArrowRecordBatchStream, FileScanTask, FileScanTaskStream,
+};
+use crate::spec::{
+    Datum, PrimitiveType, Schema,
+};
 use crate::utils::available_parallelism;
 use crate::{Error, ErrorKind};
 
@@ -712,10 +716,14 @@ impl PredicateConverter<'_> {
             let index = self
                 .column_indices
                 .iter()
-                .position(|&idx| idx == 
*column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!(
-                    "Leave column `{}` in predicates cannot be found in the 
required column indices.",
-                    reference.field().name
-                )))?;
+                .position(|&idx| idx == *column_idx)
+                .ok_or(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                "Leave column `{}` in predicates cannot be found in the 
required column indices.",
+                reference.field().name
+            ),
+                ))?;
 
             Ok(Some(index))
         } else {
diff --git a/crates/iceberg/src/delete_file_index.rs 
b/crates/iceberg/src/delete_file_index.rs
new file mode 100644
index 00000000..5265c08f
--- /dev/null
+++ b/crates/iceberg/src/delete_file_index.rs
@@ -0,0 +1,110 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use futures::channel::mpsc;
+use futures::{StreamExt, TryStreamExt};
+use tokio::sync::watch;
+
+use crate::scan::FileScanTaskDeleteFile;
+use crate::spec::{DataContentType, DataFile};
+use crate::Result;
+
+type DeleteFileIndexRef = Arc<Result<DeleteFileIndex>>;
+pub(crate) type DeleteFileIndexRefReceiver = 
watch::Receiver<Option<DeleteFileIndexRef>>;
+
+/// Index of delete files
+#[derive(Debug)]
+pub(crate) struct DeleteFileIndex {
+    #[allow(dead_code)]
+    global_deletes: Vec<Arc<FileScanTaskDeleteFile>>,
+    #[allow(dead_code)]
+    equality_deletes_by_partition: HashMap<i32, 
Vec<Arc<FileScanTaskDeleteFile>>>,
+    #[allow(dead_code)]
+    positional_deletes_by_partition: HashMap<i32, 
Vec<Arc<FileScanTaskDeleteFile>>>,
+    positional_deletes_by_path: HashMap<String, 
Vec<Arc<FileScanTaskDeleteFile>>>,
+}
+
+impl DeleteFileIndex {
+    pub(crate) fn from_receiver(
+        receiver: mpsc::Receiver<Result<FileScanTaskDeleteFile>>,
+    ) -> watch::Receiver<Option<DeleteFileIndexRef>> {
+        let (tx, rx) = watch::channel(None);
+
+        let delete_file_stream = receiver.boxed();
+        tokio::spawn(async move {
+            let delete_files = 
delete_file_stream.try_collect::<Vec<_>>().await;
+            let delete_file_index = 
delete_files.map(DeleteFileIndex::from_delete_files);
+            let delete_file_index = Arc::new(delete_file_index);
+            tx.send(Some(delete_file_index))
+        });
+
+        rx
+    }
+
+    fn from_delete_files(files: Vec<FileScanTaskDeleteFile>) -> Self {
+        let mut equality_deletes_by_partition: HashMap<i32, 
Vec<Arc<FileScanTaskDeleteFile>>> =
+            HashMap::default();
+        let mut positional_deletes_by_partition: HashMap<i32, 
Vec<Arc<FileScanTaskDeleteFile>>> =
+            HashMap::default();
+        let mut positional_deletes_by_path: HashMap<String, 
Vec<Arc<FileScanTaskDeleteFile>>> =
+            HashMap::default();
+
+        files.into_iter().for_each(|file| {
+            let arc_file = Arc::new(file);
+            match arc_file.file_type {
+                DataContentType::PositionDeletes => {
+                    // TODO: implement logic from 
ContentFileUtil.referencedDataFile
+                    // see 
https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54
+                    let referenced_data_file_path = "TODO".to_string();
+
+                    positional_deletes_by_path
+                        .entry(referenced_data_file_path)
+                        .and_modify(|entry| {
+                            entry.push(arc_file.clone());
+                        })
+                        .or_insert(vec![arc_file.clone()]);
+
+                    positional_deletes_by_partition
+                        .entry(arc_file.partition_spec_id)
+                        .and_modify(|entry| {
+                            entry.push(arc_file.clone());
+                        })
+                        .or_insert(vec![arc_file.clone()]);
+                }
+                DataContentType::EqualityDeletes => {
+                    equality_deletes_by_partition
+                        .entry(arc_file.partition_spec_id)
+                        .and_modify(|entry| {
+                            entry.push(arc_file.clone());
+                        })
+                        .or_insert(vec![arc_file.clone()]);
+                }
+                _ => unreachable!(),
+            }
+        });
+
+        DeleteFileIndex {
+            global_deletes: vec![],
+            equality_deletes_by_partition,
+            positional_deletes_by_partition,
+            positional_deletes_by_path,
+        }
+    }
+
+    /// Determine all the delete files that apply to the provided `DataFile`.
+    pub(crate) fn get_deletes_for_data_file(
+        &self,
+        data_file: &DataFile,
+    ) -> Vec<FileScanTaskDeleteFile> {
+        let mut results = vec![];
+
+        if let Some(positional_deletes) = 
self.positional_deletes_by_path.get(data_file.file_path())
+        {
+            results.extend(positional_deletes.iter().map(|i| 
i.as_ref().clone()))
+        }
+
+        // TODO: equality deletes
+
+        results
+    }
+}
diff --git a/crates/iceberg/src/expr/predicate.rs 
b/crates/iceberg/src/expr/predicate.rs
index e0f6a784..f38a0cca 100644
--- a/crates/iceberg/src/expr/predicate.rs
+++ b/crates/iceberg/src/expr/predicate.rs
@@ -724,6 +724,12 @@ pub enum BoundPredicate {
     Set(SetExpression<BoundReference>),
 }
 
+impl BoundPredicate {
+    pub(crate) fn and(self, other: BoundPredicate) -> BoundPredicate {
+        BoundPredicate::And(LogicalExpression::new([Box::new(self), 
Box::new(other)]))
+    }
+}
+
 impl Display for BoundPredicate {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         match self {
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index fe5a5299..466592bb 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -83,6 +83,8 @@ pub mod transform;
 mod runtime;
 
 pub mod arrow;
+// pub(crate) mod delete_file_index;
+pub(crate) mod delete_file_index;
 mod utils;
 pub mod writer;
 
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 5a97e74e..acf10f5e 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -27,6 +27,7 @@ use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
 use serde::{Deserialize, Serialize};
 
 use crate::arrow::ArrowReaderBuilder;
+use crate::delete_file_index::{DeleteFileIndex, DeleteFileIndexRefReceiver};
 use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
 use 
crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
 use crate::expr::visitors::inclusive_projection::InclusiveProjection;
@@ -62,6 +63,11 @@ pub struct TableScanBuilder<'a> {
     concurrency_limit_manifest_files: usize,
     row_group_filtering_enabled: bool,
     row_selection_enabled: bool,
+
+    // TODO: defaults to false for now whilst delete file processing
+    // is still being worked on but will switch to a default of true
+    // once this work is complete
+    delete_file_processing_enabled: bool,
 }
 
 impl<'a> TableScanBuilder<'a> {
@@ -80,6 +86,7 @@ impl<'a> TableScanBuilder<'a> {
             concurrency_limit_manifest_files: num_cpus,
             row_group_filtering_enabled: true,
             row_selection_enabled: false,
+            delete_file_processing_enabled: false,
         }
     }
 
@@ -186,6 +193,17 @@ impl<'a> TableScanBuilder<'a> {
         self
     }
 
+    /// Determines whether to enable delete file processing (currently 
disabled by default)
+    ///
+    /// When disabled, delete files are ignored.
+    pub fn with_delete_file_processing_enabled(
+        mut self,
+        delete_file_processing_enabled: bool,
+    ) -> Self {
+        self.delete_file_processing_enabled = delete_file_processing_enabled;
+        self
+    }
+
     /// Build the table scan.
     pub fn build(self) -> Result<TableScan> {
         let snapshot = match self.snapshot_id {
@@ -304,6 +322,7 @@ impl<'a> TableScanBuilder<'a> {
             concurrency_limit_manifest_files: 
self.concurrency_limit_manifest_files,
             row_group_filtering_enabled: self.row_group_filtering_enabled,
             row_selection_enabled: self.row_selection_enabled,
+            delete_file_processing_enabled: 
self.delete_file_processing_enabled,
         })
     }
 }
@@ -329,6 +348,7 @@ pub struct TableScan {
 
     row_group_filtering_enabled: bool,
     row_selection_enabled: bool,
+    delete_file_processing_enabled: bool,
 }
 
 /// PlanContext wraps a [`SnapshotRef`] alongside all the other
@@ -357,18 +377,39 @@ impl TableScan {
         let concurrency_limit_manifest_entries = 
self.concurrency_limit_manifest_entries;
 
         // used to stream ManifestEntryContexts between stages of the file 
plan operation
-        let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) =
+        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
+            channel(concurrency_limit_manifest_files);
+        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
             channel(concurrency_limit_manifest_files);
+
         // used to stream the results back to the caller
         let (file_scan_task_tx, file_scan_task_rx) = 
channel(concurrency_limit_manifest_entries);
 
+        let delete_file_idx_and_tx: Option<(
+            DeleteFileIndexRefReceiver,
+            Sender<Result<FileScanTaskDeleteFile>>,
+        )> = if self.delete_file_processing_enabled {
+            // used to stream delete files into the DeleteFileIndex
+            let (delete_file_tx, delete_file_rx) = 
channel(concurrency_limit_manifest_entries);
+
+            let delete_file_index_rx = 
DeleteFileIndex::from_receiver(delete_file_rx);
+            Some((delete_file_index_rx, delete_file_tx))
+        } else {
+            None
+        };
+
         let manifest_list = self.plan_context.get_manifest_list().await?;
 
-        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out
-        // partitions cannot match the scan's filter
-        let manifest_file_contexts = self
-            .plan_context
-            .build_manifest_file_contexts(manifest_list, 
manifest_entry_ctx_tx)?;
+        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out 
any
+        // whose partitions cannot match this
+        // scan's filter
+        let manifest_file_contexts = 
self.plan_context.build_manifest_file_contexts(
+            manifest_list,
+            manifest_entry_data_ctx_tx,
+            delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
+                (delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
+            }),
+        )?;
 
         let mut channel_for_manifest_error = file_scan_task_tx.clone();
 
@@ -385,17 +426,45 @@ impl TableScan {
             }
         });
 
-        let mut channel_for_manifest_entry_error = file_scan_task_tx.clone();
+        let mut channel_for_data_manifest_entry_error = 
file_scan_task_tx.clone();
+
+        if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
+            let mut channel_for_delete_manifest_entry_error = 
file_scan_task_tx.clone();
+
+            // Process the delete file [`ManifestEntry`] stream in parallel
+            spawn(async move {
+                let result = manifest_entry_delete_ctx_rx
+                    .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
+                    .try_for_each_concurrent(
+                        concurrency_limit_manifest_entries,
+                        |(manifest_entry_context, tx)| async move {
+                            spawn(async move {
+                                
Self::process_delete_manifest_entry(manifest_entry_context, tx)
+                                    .await
+                            })
+                            .await
+                        },
+                    )
+                    .await;
+
+                if let Err(error) = result {
+                    let _ = channel_for_delete_manifest_entry_error
+                        .send(Err(error))
+                        .await;
+                }
+            })
+            .await;
+        }
 
-        // Process the [`ManifestEntry`] stream in parallel
+        // Process the data file [`ManifestEntry`] stream in parallel
         spawn(async move {
-            let result = manifest_entry_ctx_rx
+            let result = manifest_entry_data_ctx_rx
                 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
                 .try_for_each_concurrent(
                     concurrency_limit_manifest_entries,
                     |(manifest_entry_context, tx)| async move {
                         spawn(async move {
-                            
Self::process_manifest_entry(manifest_entry_context, tx).await
+                            
Self::process_data_manifest_entry(manifest_entry_context, tx).await
                         })
                         .await
                     },
@@ -403,7 +472,7 @@ impl TableScan {
                 .await;
 
             if let Err(error) = result {
-                let _ = 
channel_for_manifest_entry_error.send(Err(error)).await;
+                let _ = 
channel_for_data_manifest_entry_error.send(Err(error)).await;
             }
         });
 
@@ -437,7 +506,7 @@ impl TableScan {
         &self.plan_context.snapshot
     }
 
-    async fn process_manifest_entry(
+    async fn process_data_manifest_entry(
         manifest_entry_context: ManifestEntryContext,
         mut file_scan_task_tx: Sender<Result<FileScanTask>>,
     ) -> Result<()> {
@@ -446,12 +515,11 @@ impl TableScan {
             return Ok(());
         }
 
-        // abort the plan if we encounter a manifest entry whose data file's
-        // content type is currently unsupported
+        // abort the plan if we encounter a manifest entry for a delete file
         if manifest_entry_context.manifest_entry.content_type() != 
DataContentType::Data {
             return Err(Error::new(
                 ErrorKind::FeatureUnsupported,
-                "Only Data files currently supported",
+                "Encountered an entry for a delete file in a data file 
manifest",
             ));
         }
 
@@ -489,7 +557,54 @@ impl TableScan {
         // entire plan without getting filtered out. Create a corresponding
         // FileScanTask and push it to the result stream
         file_scan_task_tx
-            .send(Ok(manifest_entry_context.into_file_scan_task()))
+            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
+            .await?;
+
+        Ok(())
+    }
+
+    async fn process_delete_manifest_entry(
+        manifest_entry_context: ManifestEntryContext,
+        mut file_scan_task_delete_file_tx: 
Sender<Result<FileScanTaskDeleteFile>>,
+    ) -> Result<()> {
+        // skip processing this manifest entry if it has been marked as deleted
+        if !manifest_entry_context.manifest_entry.is_alive() {
+            return Ok(());
+        }
+
+        // abort the plan if we encounter a manifest entry that is not for a 
delete file
+        if manifest_entry_context.manifest_entry.content_type() == 
DataContentType::Data {
+            return Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                "Encountered an entry for a data file in a delete manifest",
+            ));
+        }
+
+        if let Some(ref bound_predicates) = 
manifest_entry_context.bound_predicates {
+            let expression_evaluator_cache =
+                manifest_entry_context.expression_evaluator_cache.as_ref();
+
+            let expression_evaluator = expression_evaluator_cache.get(
+                manifest_entry_context.partition_spec_id,
+                &bound_predicates.partition_bound_predicate,
+            )?;
+
+            // skip any data file whose partition data indicates that it can't 
contain
+            // any data that matches this scan's filter
+            if 
!expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
+                return Ok(());
+            }
+        }
+
+        file_scan_task_delete_file_tx
+            .send(Ok(FileScanTaskDeleteFile {
+                file_path: manifest_entry_context
+                    .manifest_entry
+                    .file_path()
+                    .to_string(),
+                file_type: 
manifest_entry_context.manifest_entry.content_type(),
+                partition_spec_id: manifest_entry_context.partition_spec_id,
+            }))
             .await?;
 
         Ok(())
@@ -513,6 +628,7 @@ struct ManifestFileContext {
     object_cache: Arc<ObjectCache>,
     snapshot_schema: SchemaRef,
     expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
+    delete_file_index: Option<DeleteFileIndexRefReceiver>,
 }
 
 /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -525,6 +641,7 @@ struct ManifestEntryContext {
     bound_predicates: Option<Arc<BoundPredicates>>,
     partition_spec_id: i32,
     snapshot_schema: SchemaRef,
+    delete_file_index: Option<DeleteFileIndexRefReceiver>,
 }
 
 impl ManifestFileContext {
@@ -539,6 +656,7 @@ impl ManifestFileContext {
             field_ids,
             mut sender,
             expression_evaluator_cache,
+            delete_file_index,
             ..
         } = self;
 
@@ -546,13 +664,14 @@ impl ManifestFileContext {
 
         for manifest_entry in manifest.entries() {
             let manifest_entry_context = ManifestEntryContext {
-                // TODO: refactor to avoid clone
+                // TODO: refactor to avoid the expensive ManifestEntry clone
                 manifest_entry: manifest_entry.clone(),
                 expression_evaluator_cache: expression_evaluator_cache.clone(),
                 field_ids: field_ids.clone(),
                 partition_spec_id: manifest_file.partition_spec_id,
                 bound_predicates: bound_predicates.clone(),
                 snapshot_schema: snapshot_schema.clone(),
+                delete_file_index: delete_file_index.clone(),
             };
 
             sender
@@ -568,8 +687,34 @@ impl ManifestFileContext {
 impl ManifestEntryContext {
     /// consume this `ManifestEntryContext`, returning a `FileScanTask`
     /// created from it
-    fn into_file_scan_task(self) -> FileScanTask {
-        FileScanTask {
+    async fn into_file_scan_task(self) -> Result<FileScanTask> {
+        // let deletes = self.get_deletes().await?;
+
+        let deletes = if let Some(mut delete_file_index_rx) = 
self.delete_file_index {
+            let del_file_idx_opt = delete_file_index_rx
+                .wait_for(Option::is_some)
+                .await
+                .map_err(|_| Error::new(ErrorKind::Unexpected, 
"DeleteFileIndex recv error"))?;
+
+            match del_file_idx_opt.as_ref() {
+                Some(del_file_idx) => match del_file_idx.as_ref() {
+                    Ok(delete_file_idx) => {
+                        
delete_file_idx.get_deletes_for_data_file(self.manifest_entry.data_file())
+                    }
+                    Err(err) => {
+                        return Err(Error::new(ErrorKind::Unexpected, 
err.message()));
+                    }
+                },
+
+                // the `wait_for(Option::is_some)` above means that we can
+                // never get a `None` here
+                None => unreachable!(),
+            }
+        } else {
+            vec![]
+        };
+
+        Ok(FileScanTask {
             start: 0,
             length: self.manifest_entry.file_size_in_bytes(),
             record_count: Some(self.manifest_entry.record_count()),
@@ -583,7 +728,9 @@ impl ManifestEntryContext {
             predicate: self
                 .bound_predicates
                 .map(|x| x.as_ref().snapshot_bound_predicate.clone()),
-        }
+
+            deletes,
+        })
     }
 }
 
@@ -619,29 +766,33 @@ impl PlanContext {
     fn build_manifest_file_contexts(
         &self,
         manifest_list: Arc<ManifestList>,
-        sender: Sender<ManifestEntryContext>,
+        tx_data: Sender<ManifestEntryContext>,
+        delete_file_idx_and_tx: Option<(DeleteFileIndexRefReceiver, 
Sender<ManifestEntryContext>)>,
     ) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
-        let entries = manifest_list.entries();
-
-        if entries
-            .iter()
-            .any(|e| e.content != ManifestContentType::Data)
-        {
-            return Err(Error::new(
-                ErrorKind::FeatureUnsupported,
-                "Merge-on-read is not yet supported",
-            ));
-        }
+        let manifest_files = manifest_list.entries().iter();
 
         // TODO: Ideally we could ditch this intermediate Vec as we return an 
iterator.
         let mut filtered_mfcs = vec![];
-        if self.predicate.is_some() {
-            for manifest_file in entries {
+
+        for manifest_file in manifest_files {
+            let (delete_file_idx, tx) = if manifest_file.content == 
ManifestContentType::Deletes {
+                let Some((delete_file_idx, tx)) = 
delete_file_idx_and_tx.as_ref() else {
+                    continue;
+                };
+                (Some(delete_file_idx.clone()), tx.clone())
+            } else {
+                (
+                    delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()),
+                    tx_data.clone(),
+                )
+            };
+
+            let partition_bound_predicate = if self.predicate.is_some() {
                 let partition_bound_predicate = 
self.get_partition_filter(manifest_file)?;
 
                 // evaluate the ManifestFile against the partition filter. Skip
                 // if it cannot contain any matching rows
-                if self
+                if !self
                     .manifest_evaluator_cache
                     .get(
                         manifest_file.partition_spec_id,
@@ -649,19 +800,22 @@ impl PlanContext {
                     )
                     .eval(manifest_file)?
                 {
-                    let mfc = self.create_manifest_file_context(
-                        manifest_file,
-                        Some(partition_bound_predicate),
-                        sender.clone(),
-                    );
-                    filtered_mfcs.push(Ok(mfc));
+                    continue;
                 }
-            }
-        } else {
-            for manifest_file in entries {
-                let mfc = self.create_manifest_file_context(manifest_file, 
None, sender.clone());
-                filtered_mfcs.push(Ok(mfc));
-            }
+
+                Some(partition_bound_predicate)
+            } else {
+                None
+            };
+
+            let mfc = self.create_manifest_file_context(
+                manifest_file,
+                partition_bound_predicate,
+                tx,
+                delete_file_idx,
+            );
+
+            filtered_mfcs.push(Ok(mfc));
         }
 
         Ok(Box::new(filtered_mfcs.into_iter()))
@@ -672,6 +826,7 @@ impl PlanContext {
         manifest_file: &ManifestFile,
         partition_filter: Option<Arc<BoundPredicate>>,
         sender: Sender<ManifestEntryContext>,
+        delete_file_index: Option<DeleteFileIndexRefReceiver>,
     ) -> ManifestFileContext {
         let bound_predicates =
             if let (Some(ref partition_bound_predicate), 
Some(snapshot_bound_predicate)) =
@@ -693,6 +848,7 @@ impl PlanContext {
             snapshot_schema: self.snapshot_schema.clone(),
             field_ids: self.field_ids.clone(),
             expression_evaluator_cache: 
self.expression_evaluator_cache.clone(),
+            delete_file_index,
         }
     }
 }
@@ -919,8 +1075,10 @@ pub struct FileScanTask {
 
     /// The data file path corresponding to the task.
     pub data_file_path: String,
+
     /// The content type of the file to scan.
     pub data_file_content: DataContentType,
+
     /// The format of the file to scan.
     pub data_file_format: DataFileFormat,
 
@@ -931,6 +1089,22 @@ pub struct FileScanTask {
     /// The predicate to filter.
     #[serde(skip_serializing_if = "Option::is_none")]
     pub predicate: Option<BoundPredicate>,
+
+    /// The list of delete files that may need to be applied to this data file
+    pub deletes: Vec<FileScanTaskDeleteFile>,
+}
+
+/// A task to scan part of file.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct FileScanTaskDeleteFile {
+    /// The delete file path
+    pub file_path: String,
+
+    /// delete file type
+    pub file_type: DataContentType,
+
+    /// partition id
+    pub partition_spec_id: i32,
 }
 
 impl FileScanTask {
@@ -1416,16 +1590,16 @@ pub mod tests {
             .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
             .await
             .unwrap();
-        let batche1: Vec<_> = batch_stream.try_collect().await.unwrap();
+        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
 
         let reader = 
ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
         let batch_stream = reader
             .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
             .await
             .unwrap();
-        let batche2: Vec<_> = batch_stream.try_collect().await.unwrap();
+        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
 
-        assert_eq!(batche1, batche2);
+        assert_eq!(batch_1, batch_2);
     }
 
     #[tokio::test]
@@ -1867,6 +2041,7 @@ pub mod tests {
             schema: schema.clone(),
             record_count: Some(100),
             data_file_format: DataFileFormat::Parquet,
+            deletes: vec![],
         };
         test_fn(task);
 
@@ -1881,6 +2056,7 @@ pub mod tests {
             schema,
             record_count: None,
             data_file_format: DataFileFormat::Avro,
+            deletes: vec![],
         };
         test_fn(task);
     }
diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json 
b/crates/iceberg/testdata/example_table_metadata_v2.json
index 35230966..17bbd7d9 100644
--- a/crates/iceberg/testdata/example_table_metadata_v2.json
+++ b/crates/iceberg/testdata/example_table_metadata_v2.json
@@ -7,7 +7,12 @@
   "last-column-id": 3,
   "current-schema-id": 1,
   "schemas": [
-    {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", 
"required": true, "type": "long"}]},
+    {
+      "type": "struct",
+      "schema-id": 0,
+      "fields": [
+        {"id": 1, "name": "x", "required": true, "type": "long"}
+      ]},
     {
       "type": "struct",
       "schema-id": 1,
@@ -25,7 +30,14 @@
     }
   ],
   "default-spec-id": 0,
-  "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": 
"identity", "source-id": 1, "field-id": 1000}]}],
+  "partition-specs": [
+    {
+      "spec-id": 0,
+      "fields": [
+        {"name": "x", "transform": "identity", "source-id": 1, "field-id": 
1000}
+      ]
+    }
+  ],
   "last-partition-id": 1000,
   "default-sort-order-id": 3,
   "sort-orders": [

Reply via email to