This is an automated email from the ASF dual-hosted git repository.

sdd pushed a commit to branch feature/table-scan-delete-file-handling
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git

commit 54d4ae243fa625d0a3977e1187b47f372addca8e
Author: Scott Donnelly <sc...@donnel.ly>
AuthorDate: Mon Dec 23 17:27:41 2024 +0000

    feat: global delete support, DeleteFileIndex
---
 crates/iceberg/src/delete_file_index.rs | 156 ++++++++++++++++----------------
 crates/iceberg/src/scan.rs              |  12 ++-
 2 files changed, 88 insertions(+), 80 deletions(-)

diff --git a/crates/iceberg/src/delete_file_index.rs 
b/crates/iceberg/src/delete_file_index.rs
index 34169063..8a534d1f 100644
--- a/crates/iceberg/src/delete_file_index.rs
+++ b/crates/iceberg/src/delete_file_index.rs
@@ -28,6 +28,7 @@ use futures::StreamExt;
 use crate::runtime::spawn;
 use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
 use crate::spec::{DataContentType, DataFile, Struct};
+use crate::{Error, ErrorKind, Result};
 
 /// Index of delete files
 #[derive(Clone, Debug)]
@@ -47,7 +48,10 @@ struct PopulatedDeleteFileIndex {
     global_deletes: Vec<Arc<DeleteFileContext>>,
     eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
     pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
-    pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
+    // TODO: do we need this?
+    // pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
+
+    // TODO: Deletion Vector support
 }
 
 impl DeleteFileIndex {
@@ -75,7 +79,7 @@ impl DeleteFileIndex {
 
     /// Gets all the delete files that apply to the specified data file.
     ///
-    /// Returns a future that resolves to a Vec<FileScanTaskDeleteFile>
+    /// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>>
     pub(crate) fn get_deletes_for_data_file<'a>(
         &self,
         data_file: &'a DataFile,
@@ -95,60 +99,41 @@ impl PopulatedDeleteFileIndex {
             HashMap::default();
         let mut pos_deletes_by_partition: HashMap<Struct, 
Vec<Arc<DeleteFileContext>>> =
             HashMap::default();
-        let mut pos_deletes_by_path: HashMap<String, 
Vec<Arc<DeleteFileContext>>> =
-            HashMap::default();
 
-        files.into_iter().for_each(|del_file_ctx| {
-            let arc_del_file_ctx = Arc::new(del_file_ctx);
-            match arc_del_file_ctx.manifest_entry.content_type() {
-                DataContentType::PositionDeletes => {
-                    // TODO: implement logic from 
ContentFileUtil.referencedDataFile
-                    // see 
https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54
-                    let referenced_data_file_path = "TODO".to_string();
-
-                    pos_deletes_by_path
-                        .entry(referenced_data_file_path)
-                        .and_modify(|entry| {
-                            entry.push(arc_del_file_ctx.clone());
-                        })
-                        .or_insert(vec![arc_del_file_ctx.clone()]);
-
-                    pos_deletes_by_partition
-                        .entry(
-                            arc_del_file_ctx
-                                .manifest_entry
-                                .data_file()
-                                .partition()
-                                .clone(),
-                        )
-                        .and_modify(|entry| {
-                            entry.push(arc_del_file_ctx.clone());
-                        })
-                        .or_insert(vec![arc_del_file_ctx.clone()]);
-                }
-                DataContentType::EqualityDeletes => {
-                    eq_deletes_by_partition
-                        .entry(
-                            arc_del_file_ctx
-                                .manifest_entry
-                                .data_file()
-                                .partition()
-                                .clone(),
-                        )
-                        .and_modify(|entry| {
-                            entry.push(arc_del_file_ctx.clone());
-                        })
-                        .or_insert(vec![arc_del_file_ctx.clone()]);
+        let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];
+
+        files.into_iter().for_each(|ctx| {
+            let arc_ctx = Arc::new(ctx);
+
+            let partition = arc_ctx.manifest_entry.data_file().partition();
+
+            // The spec states that "Equality delete files stored with an 
unpartitioned spec are applied as global deletes".
+            if partition.fields().is_empty() {
+                // TODO: confirm we're good to skip here if we encounter a pos 
del
+                if arc_ctx.manifest_entry.content_type() != 
DataContentType::PositionDeletes {
+                    global_deletes.push(arc_ctx);
+                    return;
                 }
-                _ => unreachable!(),
             }
+
+            let destination_map = match arc_ctx.manifest_entry.content_type() {
+                DataContentType::PositionDeletes => &mut 
pos_deletes_by_partition,
+                DataContentType::EqualityDeletes => &mut 
eq_deletes_by_partition,
+                _ => unreachable!(),
+            };
+
+            destination_map
+                .entry(partition.clone())
+                .and_modify(|entry| {
+                    entry.push(arc_ctx.clone());
+                })
+                .or_insert(vec![arc_ctx.clone()]);
         });
 
         PopulatedDeleteFileIndex {
-            global_deletes: vec![],
+            global_deletes,
             eq_deletes_by_partition,
             pos_deletes_by_partition,
-            pos_deletes_by_path,
         }
     }
 
@@ -158,33 +143,47 @@ impl PopulatedDeleteFileIndex {
         data_file: &DataFile,
         seq_num: Option<i64>,
     ) -> Vec<FileScanTaskDeleteFile> {
-        let mut deletes_queue = vec![];
-
-        if let Some(deletes) = 
self.pos_deletes_by_path.get(data_file.file_path()) {
-            deletes_queue.extend(deletes.iter());
-        }
-
-        if let Some(deletes) = 
self.pos_deletes_by_partition.get(data_file.partition()) {
-            deletes_queue.extend(deletes.iter());
-        }
+        let mut results = vec![];
 
-        if let Some(deletes) = 
self.eq_deletes_by_partition.get(data_file.partition()) {
-            deletes_queue.extend(deletes.iter());
-        }
-
-        deletes_queue
+        self.global_deletes
             .iter()
+            // filter that returns true if the provided delete file's sequence 
number is **greater than or equal to** `seq_num`
             .filter(|&delete| {
                 seq_num
-                    .map(|seq_num| delete.manifest_entry.sequence_number() > 
Some(seq_num))
+                    .map(|seq_num| delete.manifest_entry.sequence_number() >= 
Some(seq_num))
                     .unwrap_or_else(|| true)
             })
-            .map(|delete| FileScanTaskDeleteFile {
-                file_path: delete.manifest_entry.file_path().to_string(),
-                file_type: delete.manifest_entry.content_type(),
-                partition_spec_id: delete.partition_spec_id,
-            })
-            .collect()
+            .for_each(|delete| results.push(delete.as_ref().into()));
+
+        if let Some(deletes) = 
self.eq_deletes_by_partition.get(data_file.partition()) {
+            deletes
+                .iter()
+                // filter that returns true if the provided delete file's 
sequence number is **greater than or equal to** `seq_num`
+                .filter(|&delete| {
+                    seq_num
+                        .map(|seq_num| delete.manifest_entry.sequence_number() 
>= Some(seq_num))
+                        .unwrap_or_else(|| true)
+                })
+                .for_each(|delete| results.push(delete.as_ref().into()));
+        }
+
+        // TODO: the spec states that:
+        //     "The data file's file_path is equal to the delete file's 
referenced_data_file if it is non-null".
+        //     we're not yet doing that here. The referenced data file's name 
will also be present in the positional
+        //     delete file's file path column.
+        if let Some(deletes) = 
self.pos_deletes_by_partition.get(data_file.partition()) {
+            deletes
+                .iter()
+                // filter that returns true if the provided delete file's 
sequence number is **greater thano** `seq_num`
+                .filter(|&delete| {
+                    seq_num
+                        .map(|seq_num| delete.manifest_entry.sequence_number() 
> Some(seq_num))
+                        .unwrap_or_else(|| true)
+                })
+                .for_each(|delete| results.push(delete.as_ref().into()));
+        }
+
+        results
     }
 }
 
@@ -196,18 +195,17 @@ pub(crate) struct DeletesForDataFile<'a> {
 }
 
 impl Future for DeletesForDataFile<'_> {
-    type Output = Vec<FileScanTaskDeleteFile>;
+    type Output = Result<Vec<FileScanTaskDeleteFile>>;
 
     fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> 
{
-        let Ok(guard) = self.state.try_read() else {
-            return Poll::Pending;
-        };
-
-        match guard.deref() {
-            DeleteFileIndexState::Populated(idx) => {
-                Poll::Ready(idx.get_deletes_for_data_file(self.data_file, 
self.seq_num))
-            }
-            _ => Poll::Pending,
+        match self.state.try_read() {
+            Ok(guard) => match guard.deref() {
+                DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok(
+                    idx.get_deletes_for_data_file(self.data_file, self.seq_num)
+                )),
+                _ => Poll::Pending,
+            },
+            Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, 
err.to_string()))),
         }
     }
 }
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 0656fba1..f1afeb2f 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -684,7 +684,7 @@ impl ManifestEntryContext {
                     self.manifest_entry.data_file(),
                     self.manifest_entry.sequence_number(),
                 )
-                .await
+                .await?
         } else {
             vec![]
         };
@@ -1088,6 +1088,16 @@ pub(crate) struct DeleteFileContext {
     pub(crate) partition_spec_id: i32,
 }
 
+impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
+    fn from(ctx: &DeleteFileContext) -> Self {
+        FileScanTaskDeleteFile {
+            file_path: ctx.manifest_entry.file_path().to_string(),
+            file_type: ctx.manifest_entry.content_type(),
+            partition_spec_id: ctx.partition_spec_id,
+        }
+    }
+}
+
 impl FileScanTask {
     /// Returns the data file path of this file scan task.
     pub fn data_file_path(&self) -> &str {

Reply via email to