This is an automated email from the ASF dual-hosted git repository. sdd pushed a commit to branch feature/table-scan-delete-file-handling in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
commit 54d4ae243fa625d0a3977e1187b47f372addca8e Author: Scott Donnelly <sc...@donnel.ly> AuthorDate: Mon Dec 23 17:27:41 2024 +0000 feat: global delete support, DeleteFileIndex --- crates/iceberg/src/delete_file_index.rs | 156 ++++++++++++++++---------------- crates/iceberg/src/scan.rs | 12 ++- 2 files changed, 88 insertions(+), 80 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 34169063..8a534d1f 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -28,6 +28,7 @@ use futures::StreamExt; use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; +use crate::{Error, ErrorKind, Result}; /// Index of delete files #[derive(Clone, Debug)] @@ -47,7 +48,10 @@ struct PopulatedDeleteFileIndex { global_deletes: Vec<Arc<DeleteFileContext>>, eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>, pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>, - pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>, + // TODO: do we need this? + // pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>, + + // TODO: Deletion Vector support } impl DeleteFileIndex { @@ -75,7 +79,7 @@ impl DeleteFileIndex { /// Gets all the delete files that apply to the specified data file. /// - /// Returns a future that resolves to a Vec<FileScanTaskDeleteFile> + /// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>> pub(crate) fn get_deletes_for_data_file<'a>( &self, data_file: &'a DataFile, @@ -95,60 +99,41 @@ impl PopulatedDeleteFileIndex { HashMap::default(); let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> = HashMap::default(); - let mut pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>> = - HashMap::default(); - files.into_iter().for_each(|del_file_ctx| { - let arc_del_file_ctx = Arc::new(del_file_ctx); - match arc_del_file_ctx.manifest_entry.content_type() { - DataContentType::PositionDeletes => { - // TODO: implement logic from ContentFileUtil.referencedDataFile - // see https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54 - let referenced_data_file_path = "TODO".to_string(); - - pos_deletes_by_path - .entry(referenced_data_file_path) - .and_modify(|entry| { - entry.push(arc_del_file_ctx.clone()); - }) - .or_insert(vec![arc_del_file_ctx.clone()]); - - pos_deletes_by_partition - .entry( - arc_del_file_ctx - .manifest_entry - .data_file() - .partition() - .clone(), - ) - .and_modify(|entry| { - entry.push(arc_del_file_ctx.clone()); - }) - .or_insert(vec![arc_del_file_ctx.clone()]); - } - DataContentType::EqualityDeletes => { - eq_deletes_by_partition - .entry( - arc_del_file_ctx - .manifest_entry - .data_file() - .partition() - .clone(), - ) - .and_modify(|entry| { - entry.push(arc_del_file_ctx.clone()); - }) - .or_insert(vec![arc_del_file_ctx.clone()]); + let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![]; + + files.into_iter().for_each(|ctx| { + let arc_ctx = Arc::new(ctx); + + let partition = arc_ctx.manifest_entry.data_file().partition(); + + // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes". + if partition.fields().is_empty() { + // TODO: confirm we're good to skip here if we encounter a pos del + if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { + global_deletes.push(arc_ctx); + return; } - _ => unreachable!(), } + + let destination_map = match arc_ctx.manifest_entry.content_type() { + DataContentType::PositionDeletes => &mut pos_deletes_by_partition, + DataContentType::EqualityDeletes => &mut eq_deletes_by_partition, + _ => unreachable!(), + }; + + destination_map + .entry(partition.clone()) + .and_modify(|entry| { + entry.push(arc_ctx.clone()); + }) + .or_insert(vec![arc_ctx.clone()]); }); PopulatedDeleteFileIndex { - global_deletes: vec![], + global_deletes, eq_deletes_by_partition, pos_deletes_by_partition, - pos_deletes_by_path, } } @@ -158,33 +143,47 @@ impl PopulatedDeleteFileIndex { data_file: &DataFile, seq_num: Option<i64>, ) -> Vec<FileScanTaskDeleteFile> { - let mut deletes_queue = vec![]; - - if let Some(deletes) = self.pos_deletes_by_path.get(data_file.file_path()) { - deletes_queue.extend(deletes.iter()); - } - - if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { - deletes_queue.extend(deletes.iter()); - } + let mut results = vec![]; - if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) { - deletes_queue.extend(deletes.iter()); - } - - deletes_queue + self.global_deletes .iter() + // filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num` .filter(|&delete| { seq_num - .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) .unwrap_or_else(|| true) }) - .map(|delete| FileScanTaskDeleteFile { - file_path: delete.manifest_entry.file_path().to_string(), - file_type: delete.manifest_entry.content_type(), - partition_spec_id: delete.partition_spec_id, - }) - .collect() + .for_each(|delete| results.push(delete.as_ref().into())); + + if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + // filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num` + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| results.push(delete.as_ref().into())); + } + + // TODO: the spec states that: + // "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null". + // we're not yet doing that here. The referenced data file's name will also be present in the positional + // delete file's file path column. + if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + // filter that returns true if the provided delete file's sequence number is **greater thano** `seq_num` + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| results.push(delete.as_ref().into())); + } + + results } } @@ -196,18 +195,17 @@ pub(crate) struct DeletesForDataFile<'a> { } impl Future for DeletesForDataFile<'_> { - type Output = Vec<FileScanTaskDeleteFile>; + type Output = Result<Vec<FileScanTaskDeleteFile>>; fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { - let Ok(guard) = self.state.try_read() else { - return Poll::Pending; - }; - - match guard.deref() { - DeleteFileIndexState::Populated(idx) => { - Poll::Ready(idx.get_deletes_for_data_file(self.data_file, self.seq_num)) - } - _ => Poll::Pending, + match self.state.try_read() { + Ok(guard) => match guard.deref() { + DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok( + idx.get_deletes_for_data_file(self.data_file, self.seq_num) + )), + _ => Poll::Pending, + }, + Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))), } } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 0656fba1..f1afeb2f 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -684,7 +684,7 @@ impl ManifestEntryContext { self.manifest_entry.data_file(), self.manifest_entry.sequence_number(), ) - .await + .await? } else { vec![] }; @@ -1088,6 +1088,16 @@ pub(crate) struct DeleteFileContext { pub(crate) partition_spec_id: i32, } +impl From<&DeleteFileContext> for FileScanTaskDeleteFile { + fn from(ctx: &DeleteFileContext) -> Self { + FileScanTaskDeleteFile { + file_path: ctx.manifest_entry.file_path().to_string(), + file_type: ctx.manifest_entry.content_type(), + partition_spec_id: ctx.partition_spec_id, + } + } +} + impl FileScanTask { /// Returns the data file path of this file scan task. pub fn data_file_path(&self) -> &str {