This is an automated email from the ASF dual-hosted git repository.

sdd pushed a commit to branch feature/table-scan-delete-file-handling
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git

commit 93112187fe810ba93dc6ed03be0e4f600494e4ca
Author: Scott Donnelly <sc...@donnel.ly>
AuthorDate: Sat Dec 21 20:46:06 2024 +0000

    refactor: introduce DeleteFileContext.
    
    Used to encapsulate enough information for DeleteFileIndex to be able to 
perform the requrired filtering of delete files
---
 crates/iceberg/src/delete_file_index.rs | 129 +++++++++++++++++++++++---------
 crates/iceberg/src/scan.rs              |  33 ++++----
 2 files changed, 110 insertions(+), 52 deletions(-)

diff --git a/crates/iceberg/src/delete_file_index.rs 
b/crates/iceberg/src/delete_file_index.rs
index 5265c08f..6bda14a8 100644
--- a/crates/iceberg/src/delete_file_index.rs
+++ b/crates/iceberg/src/delete_file_index.rs
@@ -2,11 +2,11 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use futures::channel::mpsc;
-use futures::{StreamExt, TryStreamExt};
+use futures::{StreamExt, TryFutureExt, TryStreamExt};
 use tokio::sync::watch;
 
-use crate::scan::FileScanTaskDeleteFile;
-use crate::spec::{DataContentType, DataFile};
+use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
+use crate::spec::{DataContentType, DataFile, Struct};
 use crate::Result;
 
 type DeleteFileIndexRef = Arc<Result<DeleteFileIndex>>;
@@ -16,17 +16,15 @@ pub(crate) type DeleteFileIndexRefReceiver = 
watch::Receiver<Option<DeleteFileIn
 #[derive(Debug)]
 pub(crate) struct DeleteFileIndex {
     #[allow(dead_code)]
-    global_deletes: Vec<Arc<FileScanTaskDeleteFile>>,
-    #[allow(dead_code)]
-    equality_deletes_by_partition: HashMap<i32, 
Vec<Arc<FileScanTaskDeleteFile>>>,
-    #[allow(dead_code)]
-    positional_deletes_by_partition: HashMap<i32, 
Vec<Arc<FileScanTaskDeleteFile>>>,
-    positional_deletes_by_path: HashMap<String, 
Vec<Arc<FileScanTaskDeleteFile>>>,
+    global_deletes: Vec<Arc<DeleteFileContext>>,
+    eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
+    pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
+    pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
 }
 
 impl DeleteFileIndex {
-    pub(crate) fn from_receiver(
-        receiver: mpsc::Receiver<Result<FileScanTaskDeleteFile>>,
+    pub(crate) fn from_del_file_chan(
+        receiver: mpsc::Receiver<Result<DeleteFileContext>>,
     ) -> watch::Receiver<Option<DeleteFileIndexRef>> {
         let (tx, rx) = watch::channel(None);
 
@@ -41,43 +39,55 @@ impl DeleteFileIndex {
         rx
     }
 
-    fn from_delete_files(files: Vec<FileScanTaskDeleteFile>) -> Self {
-        let mut equality_deletes_by_partition: HashMap<i32, 
Vec<Arc<FileScanTaskDeleteFile>>> =
+    fn from_delete_files(files: Vec<DeleteFileContext>) -> Self {
+        let mut eq_deletes_by_partition: HashMap<Struct, 
Vec<Arc<DeleteFileContext>>> =
             HashMap::default();
-        let mut positional_deletes_by_partition: HashMap<i32, 
Vec<Arc<FileScanTaskDeleteFile>>> =
+        let mut pos_deletes_by_partition: HashMap<Struct, 
Vec<Arc<DeleteFileContext>>> =
             HashMap::default();
-        let mut positional_deletes_by_path: HashMap<String, 
Vec<Arc<FileScanTaskDeleteFile>>> =
+        let mut pos_deletes_by_path: HashMap<String, 
Vec<Arc<DeleteFileContext>>> =
             HashMap::default();
 
-        files.into_iter().for_each(|file| {
-            let arc_file = Arc::new(file);
-            match arc_file.file_type {
+        files.into_iter().for_each(|del_file_ctx| {
+            let arc_del_file_ctx = Arc::new(del_file_ctx);
+            match arc_del_file_ctx.manifest_entry.content_type() {
                 DataContentType::PositionDeletes => {
                     // TODO: implement logic from 
ContentFileUtil.referencedDataFile
                     // see 
https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54
                     let referenced_data_file_path = "TODO".to_string();
 
-                    positional_deletes_by_path
+                    pos_deletes_by_path
                         .entry(referenced_data_file_path)
                         .and_modify(|entry| {
-                            entry.push(arc_file.clone());
+                            entry.push(arc_del_file_ctx.clone());
                         })
-                        .or_insert(vec![arc_file.clone()]);
-
-                    positional_deletes_by_partition
-                        .entry(arc_file.partition_spec_id)
+                        .or_insert(vec![arc_del_file_ctx.clone()]);
+
+                    pos_deletes_by_partition
+                        .entry(
+                            arc_del_file_ctx
+                                .manifest_entry
+                                .data_file()
+                                .partition()
+                                .clone(),
+                        )
                         .and_modify(|entry| {
-                            entry.push(arc_file.clone());
+                            entry.push(arc_del_file_ctx.clone());
                         })
-                        .or_insert(vec![arc_file.clone()]);
+                        .or_insert(vec![arc_del_file_ctx.clone()]);
                 }
                 DataContentType::EqualityDeletes => {
-                    equality_deletes_by_partition
-                        .entry(arc_file.partition_spec_id)
+                    eq_deletes_by_partition
+                        .entry(
+                            arc_del_file_ctx
+                                .manifest_entry
+                                .data_file()
+                                .partition()
+                                .clone(),
+                        )
                         .and_modify(|entry| {
-                            entry.push(arc_file.clone());
+                            entry.push(arc_del_file_ctx.clone());
                         })
-                        .or_insert(vec![arc_file.clone()]);
+                        .or_insert(vec![arc_del_file_ctx.clone()]);
                 }
                 _ => unreachable!(),
             }
@@ -85,9 +95,9 @@ impl DeleteFileIndex {
 
         DeleteFileIndex {
             global_deletes: vec![],
-            equality_deletes_by_partition,
-            positional_deletes_by_partition,
-            positional_deletes_by_path,
+            eq_deletes_by_partition,
+            pos_deletes_by_partition,
+            pos_deletes_by_path,
         }
     }
 
@@ -95,15 +105,60 @@ impl DeleteFileIndex {
     pub(crate) fn get_deletes_for_data_file(
         &self,
         data_file: &DataFile,
+        seq_num: Option<i64>,
     ) -> Vec<FileScanTaskDeleteFile> {
         let mut results = vec![];
 
-        if let Some(positional_deletes) = 
self.positional_deletes_by_path.get(data_file.file_path())
-        {
-            results.extend(positional_deletes.iter().map(|i| 
i.as_ref().clone()))
+        if let Some(deletes) = 
self.pos_deletes_by_path.get(data_file.file_path()) {
+            deletes
+                .iter()
+                .filter(|&delete| {
+                    seq_num
+                        .map(|seq_num| delete.manifest_entry.sequence_number() 
> Some(seq_num))
+                        .unwrap_or_else(|| true)
+                })
+                .for_each(|delete| {
+                    results.push(FileScanTaskDeleteFile {
+                        file_path: 
delete.manifest_entry.file_path().to_string(),
+                        file_type: delete.manifest_entry.content_type(),
+                        partition_spec_id: delete.partition_spec_id,
+                    })
+                });
         }
 
-        // TODO: equality deletes
+        if let Some(deletes) = 
self.pos_deletes_by_partition.get(data_file.partition()) {
+            deletes
+                .iter()
+                .filter(|&delete| {
+                    seq_num
+                        .map(|seq_num| delete.manifest_entry.sequence_number() 
> Some(seq_num))
+                        .unwrap_or_else(|| true)
+                })
+                .for_each(|delete| {
+                    results.push(FileScanTaskDeleteFile {
+                        file_path: 
delete.manifest_entry.file_path().to_string(),
+                        file_type: delete.manifest_entry.content_type(),
+                        partition_spec_id: delete.partition_spec_id,
+                    })
+                });
+        }
+
+        if let Some(deletes) = 
self.eq_deletes_by_partition.get(data_file.partition()) {
+            deletes
+                .iter()
+                .filter(|&delete| {
+                    seq_num
+                        .map(|seq_num| delete.manifest_entry.sequence_number() 
> Some(seq_num))
+                        .unwrap_or_else(|| true)
+                })
+                .for_each(|delete| {
+                    results.push(FileScanTaskDeleteFile {
+                        file_path: 
delete.manifest_entry.file_path().to_string(),
+                        file_type: delete.manifest_entry.content_type(),
+                        partition_spec_id: delete.partition_spec_id,
+                    })
+                });
+        }
 
         results
     }
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index acf10f5e..b3bbb551 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -385,14 +385,14 @@ impl TableScan {
         // used to stream the results back to the caller
         let (file_scan_task_tx, file_scan_task_rx) = 
channel(concurrency_limit_manifest_entries);
 
+        // DeleteFileIndexRefReceiver is a watch channel receiver that will
+        // be notified when the DeleteFileIndex is ready.
         let delete_file_idx_and_tx: Option<(
             DeleteFileIndexRefReceiver,
-            Sender<Result<FileScanTaskDeleteFile>>,
+            Sender<Result<DeleteFileContext>>,
         )> = if self.delete_file_processing_enabled {
-            // used to stream delete files into the DeleteFileIndex
             let (delete_file_tx, delete_file_rx) = 
channel(concurrency_limit_manifest_entries);
-
-            let delete_file_index_rx = 
DeleteFileIndex::from_receiver(delete_file_rx);
+            let delete_file_index_rx = 
DeleteFileIndex::from_del_file_chan(delete_file_rx);
             Some((delete_file_index_rx, delete_file_tx))
         } else {
             None
@@ -565,7 +565,7 @@ impl TableScan {
 
     async fn process_delete_manifest_entry(
         manifest_entry_context: ManifestEntryContext,
-        mut file_scan_task_delete_file_tx: 
Sender<Result<FileScanTaskDeleteFile>>,
+        mut delete_file_ctx_tx: Sender<Result<DeleteFileContext>>,
     ) -> Result<()> {
         // skip processing this manifest entry if it has been marked as deleted
         if !manifest_entry_context.manifest_entry.is_alive() {
@@ -596,13 +596,9 @@ impl TableScan {
             }
         }
 
-        file_scan_task_delete_file_tx
-            .send(Ok(FileScanTaskDeleteFile {
-                file_path: manifest_entry_context
-                    .manifest_entry
-                    .file_path()
-                    .to_string(),
-                file_type: 
manifest_entry_context.manifest_entry.content_type(),
+        delete_file_ctx_tx
+            .send(Ok(DeleteFileContext {
+                manifest_entry: manifest_entry_context.manifest_entry.clone(),
                 partition_spec_id: manifest_entry_context.partition_spec_id,
             }))
             .await?;
@@ -698,9 +694,10 @@ impl ManifestEntryContext {
 
             match del_file_idx_opt.as_ref() {
                 Some(del_file_idx) => match del_file_idx.as_ref() {
-                    Ok(delete_file_idx) => {
-                        
delete_file_idx.get_deletes_for_data_file(self.manifest_entry.data_file())
-                    }
+                    Ok(delete_file_idx) => 
delete_file_idx.get_deletes_for_data_file(
+                        self.manifest_entry.data_file(),
+                        self.manifest_entry.sequence_number(),
+                    ),
                     Err(err) => {
                         return Err(Error::new(ErrorKind::Unexpected, 
err.message()));
                     }
@@ -1107,6 +1104,12 @@ pub struct FileScanTaskDeleteFile {
     pub partition_spec_id: i32,
 }
 
+#[derive(Debug)]
+pub(crate) struct DeleteFileContext {
+    pub(crate) manifest_entry: ManifestEntryRef,
+    pub(crate) partition_spec_id: i32,
+}
+
 impl FileScanTask {
     /// Returns the data file path of this file scan task.
     pub fn data_file_path(&self) -> &str {

Reply via email to