This is an automated email from the ASF dual-hosted git repository. sdd pushed a commit to branch feature/table-scan-delete-file-handling in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
commit 93112187fe810ba93dc6ed03be0e4f600494e4ca Author: Scott Donnelly <sc...@donnel.ly> AuthorDate: Sat Dec 21 20:46:06 2024 +0000 refactor: introduce DeleteFileContext. Used to encapsulate enough information for DeleteFileIndex to be able to perform the requrired filtering of delete files --- crates/iceberg/src/delete_file_index.rs | 129 +++++++++++++++++++++++--------- crates/iceberg/src/scan.rs | 33 ++++---- 2 files changed, 110 insertions(+), 52 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 5265c08f..6bda14a8 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -2,11 +2,11 @@ use std::collections::HashMap; use std::sync::Arc; use futures::channel::mpsc; -use futures::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use tokio::sync::watch; -use crate::scan::FileScanTaskDeleteFile; -use crate::spec::{DataContentType, DataFile}; +use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; +use crate::spec::{DataContentType, DataFile, Struct}; use crate::Result; type DeleteFileIndexRef = Arc<Result<DeleteFileIndex>>; @@ -16,17 +16,15 @@ pub(crate) type DeleteFileIndexRefReceiver = watch::Receiver<Option<DeleteFileIn #[derive(Debug)] pub(crate) struct DeleteFileIndex { #[allow(dead_code)] - global_deletes: Vec<Arc<FileScanTaskDeleteFile>>, - #[allow(dead_code)] - equality_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>>, - #[allow(dead_code)] - positional_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>>, - positional_deletes_by_path: HashMap<String, Vec<Arc<FileScanTaskDeleteFile>>>, + global_deletes: Vec<Arc<DeleteFileContext>>, + eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>, + pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>, + pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>, } impl DeleteFileIndex { - pub(crate) fn from_receiver( - receiver: mpsc::Receiver<Result<FileScanTaskDeleteFile>>, + pub(crate) fn from_del_file_chan( + receiver: mpsc::Receiver<Result<DeleteFileContext>>, ) -> watch::Receiver<Option<DeleteFileIndexRef>> { let (tx, rx) = watch::channel(None); @@ -41,43 +39,55 @@ impl DeleteFileIndex { rx } - fn from_delete_files(files: Vec<FileScanTaskDeleteFile>) -> Self { - let mut equality_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>> = + fn from_delete_files(files: Vec<DeleteFileContext>) -> Self { + let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> = HashMap::default(); - let mut positional_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>> = + let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> = HashMap::default(); - let mut positional_deletes_by_path: HashMap<String, Vec<Arc<FileScanTaskDeleteFile>>> = + let mut pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>> = HashMap::default(); - files.into_iter().for_each(|file| { - let arc_file = Arc::new(file); - match arc_file.file_type { + files.into_iter().for_each(|del_file_ctx| { + let arc_del_file_ctx = Arc::new(del_file_ctx); + match arc_del_file_ctx.manifest_entry.content_type() { DataContentType::PositionDeletes => { // TODO: implement logic from ContentFileUtil.referencedDataFile // see https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54 let referenced_data_file_path = "TODO".to_string(); - positional_deletes_by_path + pos_deletes_by_path .entry(referenced_data_file_path) .and_modify(|entry| { - entry.push(arc_file.clone()); + entry.push(arc_del_file_ctx.clone()); }) - .or_insert(vec![arc_file.clone()]); - - positional_deletes_by_partition - .entry(arc_file.partition_spec_id) + .or_insert(vec![arc_del_file_ctx.clone()]); + + pos_deletes_by_partition + .entry( + arc_del_file_ctx + .manifest_entry + .data_file() + .partition() + .clone(), + ) .and_modify(|entry| { - entry.push(arc_file.clone()); + entry.push(arc_del_file_ctx.clone()); }) - .or_insert(vec![arc_file.clone()]); + .or_insert(vec![arc_del_file_ctx.clone()]); } DataContentType::EqualityDeletes => { - equality_deletes_by_partition - .entry(arc_file.partition_spec_id) + eq_deletes_by_partition + .entry( + arc_del_file_ctx + .manifest_entry + .data_file() + .partition() + .clone(), + ) .and_modify(|entry| { - entry.push(arc_file.clone()); + entry.push(arc_del_file_ctx.clone()); }) - .or_insert(vec![arc_file.clone()]); + .or_insert(vec![arc_del_file_ctx.clone()]); } _ => unreachable!(), } @@ -85,9 +95,9 @@ impl DeleteFileIndex { DeleteFileIndex { global_deletes: vec![], - equality_deletes_by_partition, - positional_deletes_by_partition, - positional_deletes_by_path, + eq_deletes_by_partition, + pos_deletes_by_partition, + pos_deletes_by_path, } } @@ -95,15 +105,60 @@ impl DeleteFileIndex { pub(crate) fn get_deletes_for_data_file( &self, data_file: &DataFile, + seq_num: Option<i64>, ) -> Vec<FileScanTaskDeleteFile> { let mut results = vec![]; - if let Some(positional_deletes) = self.positional_deletes_by_path.get(data_file.file_path()) - { - results.extend(positional_deletes.iter().map(|i| i.as_ref().clone())) + if let Some(deletes) = self.pos_deletes_by_path.get(data_file.file_path()) { + deletes + .iter() + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| { + results.push(FileScanTaskDeleteFile { + file_path: delete.manifest_entry.file_path().to_string(), + file_type: delete.manifest_entry.content_type(), + partition_spec_id: delete.partition_spec_id, + }) + }); } - // TODO: equality deletes + if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| { + results.push(FileScanTaskDeleteFile { + file_path: delete.manifest_entry.file_path().to_string(), + file_type: delete.manifest_entry.content_type(), + partition_spec_id: delete.partition_spec_id, + }) + }); + } + + if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| { + results.push(FileScanTaskDeleteFile { + file_path: delete.manifest_entry.file_path().to_string(), + file_type: delete.manifest_entry.content_type(), + partition_spec_id: delete.partition_spec_id, + }) + }); + } results } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index acf10f5e..b3bbb551 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -385,14 +385,14 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + // DeleteFileIndexRefReceiver is a watch channel receiver that will + // be notified when the DeleteFileIndex is ready. let delete_file_idx_and_tx: Option<( DeleteFileIndexRefReceiver, - Sender<Result<FileScanTaskDeleteFile>>, + Sender<Result<DeleteFileContext>>, )> = if self.delete_file_processing_enabled { - // used to stream delete files into the DeleteFileIndex let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries); - - let delete_file_index_rx = DeleteFileIndex::from_receiver(delete_file_rx); + let delete_file_index_rx = DeleteFileIndex::from_del_file_chan(delete_file_rx); Some((delete_file_index_rx, delete_file_tx)) } else { None @@ -565,7 +565,7 @@ impl TableScan { async fn process_delete_manifest_entry( manifest_entry_context: ManifestEntryContext, - mut file_scan_task_delete_file_tx: Sender<Result<FileScanTaskDeleteFile>>, + mut delete_file_ctx_tx: Sender<Result<DeleteFileContext>>, ) -> Result<()> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { @@ -596,13 +596,9 @@ impl TableScan { } } - file_scan_task_delete_file_tx - .send(Ok(FileScanTaskDeleteFile { - file_path: manifest_entry_context - .manifest_entry - .file_path() - .to_string(), - file_type: manifest_entry_context.manifest_entry.content_type(), + delete_file_ctx_tx + .send(Ok(DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), partition_spec_id: manifest_entry_context.partition_spec_id, })) .await?; @@ -698,9 +694,10 @@ impl ManifestEntryContext { match del_file_idx_opt.as_ref() { Some(del_file_idx) => match del_file_idx.as_ref() { - Ok(delete_file_idx) => { - delete_file_idx.get_deletes_for_data_file(self.manifest_entry.data_file()) - } + Ok(delete_file_idx) => delete_file_idx.get_deletes_for_data_file( + self.manifest_entry.data_file(), + self.manifest_entry.sequence_number(), + ), Err(err) => { return Err(Error::new(ErrorKind::Unexpected, err.message())); } @@ -1107,6 +1104,12 @@ pub struct FileScanTaskDeleteFile { pub partition_spec_id: i32, } +#[derive(Debug)] +pub(crate) struct DeleteFileContext { + pub(crate) manifest_entry: ManifestEntryRef, + pub(crate) partition_spec_id: i32, +} + impl FileScanTask { /// Returns the data file path of this file scan task. pub fn data_file_path(&self) -> &str {