This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d33f3bb7 fix: global eq delete matching should apply to only strictly
older files, and fix partition scoped matching to consider spec id (#1758)
d33f3bb7 is described below
commit d33f3bb77ede1bf481bf71d9ddb45cb4cdcbd858
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Oct 20 10:48:51 2025 -0600
fix: global eq delete matching should apply to only strictly older files,
and fix partition scoped matching to consider spec id (#1758)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #1759.
## What changes are included in this PR?
This changes scan planning of equality deletes so that:
1. we only match against eq deletes which are strictly older than the
data file. Right now it looks like we incorrectly over-apply based on
the seq number for global equality deletes.
2. Partition scoped deletes (both equality and position) are compared
correctly by also factoring in the spec ID. It's not quite enough to
compare just off the tuple, we should also compare based off the spec ID
as well.
## Are these changes tested?
Added unit tests which are scoped to testing delete index matching
logic.
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
---
crates/iceberg/src/delete_file_index.rs | 292 +++++++++++++++++++++++++++++++-
1 file changed, 283 insertions(+), 9 deletions(-)
diff --git a/crates/iceberg/src/delete_file_index.rs
b/crates/iceberg/src/delete_file_index.rs
index d8f7a872..4f6fd284 100644
--- a/crates/iceberg/src/delete_file_index.rs
+++ b/crates/iceberg/src/delete_file_index.rs
@@ -42,7 +42,7 @@ enum DeleteFileIndexState {
#[derive(Debug)]
struct PopulatedDeleteFileIndex {
#[allow(dead_code)]
- global_deletes: Vec<Arc<DeleteFileContext>>,
+ global_equality_deletes: Vec<Arc<DeleteFileContext>>,
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
// TODO: do we need this?
@@ -65,7 +65,8 @@ impl DeleteFileIndex {
spawn({
let state = state.clone();
async move {
- let delete_files =
delete_file_stream.collect::<Vec<_>>().await;
+ let delete_files: Vec<DeleteFileContext> =
+ delete_file_stream.collect::<Vec<_>>().await;
let populated_delete_file_index =
PopulatedDeleteFileIndex::new(delete_files);
@@ -114,7 +115,7 @@ impl PopulatedDeleteFileIndex {
///
/// 1. The partition information is extracted from each delete file's
manifest entry.
/// 2. If the partition is empty and the delete file is not a positional
delete,
- /// it is added to the `global_deletes` vector
+ /// it is added to the `global_equality_deletes` vector
/// 3. Otherwise, the delete file is added to one of two hash maps based
on its content type.
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
let mut eq_deletes_by_partition: HashMap<Struct,
Vec<Arc<DeleteFileContext>>> =
@@ -122,7 +123,7 @@ impl PopulatedDeleteFileIndex {
let mut pos_deletes_by_partition: HashMap<Struct,
Vec<Arc<DeleteFileContext>>> =
HashMap::default();
- let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];
+ let mut global_equality_deletes: Vec<Arc<DeleteFileContext>> = vec![];
files.into_iter().for_each(|ctx| {
let arc_ctx = Arc::new(ctx);
@@ -133,7 +134,7 @@ impl PopulatedDeleteFileIndex {
if partition.fields().is_empty() {
// TODO: confirm we're good to skip here if we encounter a pos
del
if arc_ctx.manifest_entry.content_type() !=
DataContentType::PositionDeletes {
- global_deletes.push(arc_ctx);
+ global_equality_deletes.push(arc_ctx);
return;
}
}
@@ -153,7 +154,7 @@ impl PopulatedDeleteFileIndex {
});
PopulatedDeleteFileIndex {
- global_deletes,
+ global_equality_deletes,
eq_deletes_by_partition,
pos_deletes_by_partition,
}
@@ -167,12 +168,12 @@ impl PopulatedDeleteFileIndex {
) -> Vec<FileScanTaskDeleteFile> {
let mut results = vec![];
- self.global_deletes
+ self.global_equality_deletes
.iter()
- // filter that returns true if the provided delete file's sequence
number is **greater than or equal to** `seq_num`
+ // filter that returns true if the provided delete file's sequence
number is **greater than** `seq_num`
.filter(|&delete| {
seq_num
- .map(|seq_num| delete.manifest_entry.sequence_number() >=
Some(seq_num))
+ .map(|seq_num| delete.manifest_entry.sequence_number() >
Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| results.push(delete.as_ref().into()));
@@ -185,6 +186,7 @@ impl PopulatedDeleteFileIndex {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number()
> Some(seq_num))
.unwrap_or_else(|| true)
+ && data_file.partition_spec_id ==
delete.partition_spec_id
})
.for_each(|delete| results.push(delete.as_ref().into()));
}
@@ -201,6 +203,7 @@ impl PopulatedDeleteFileIndex {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number()
>= Some(seq_num))
.unwrap_or_else(|| true)
+ && data_file.partition_spec_id ==
delete.partition_spec_id
})
.for_each(|delete| results.push(delete.as_ref().into()));
}
@@ -208,3 +211,274 @@ impl PopulatedDeleteFileIndex {
results
}
}
+
+#[cfg(test)]
+mod tests {
+ use uuid::Uuid;
+
+ use super::*;
+ use crate::spec::{
+ DataContentType, DataFileBuilder, DataFileFormat, Literal,
ManifestEntry, ManifestStatus,
+ Struct,
+ };
+
+ #[test]
+ fn test_delete_file_index_unpartitioned() {
+ let deletes: Vec<ManifestEntry> = vec![
+ build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
+ build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
+ build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
+ build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
+ ];
+
+ let delete_file_paths: Vec<String> = deletes
+ .iter()
+ .map(|file| file.file_path().to_string())
+ .collect();
+
+ let delete_contexts: Vec<DeleteFileContext> = deletes
+ .into_iter()
+ .map(|entry| DeleteFileContext {
+ manifest_entry: entry.into(),
+ partition_spec_id: 0,
+ })
+ .collect();
+
+ let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
+
+ let data_file = build_unpartitioned_data_file();
+
+ // All deletes apply to sequence 0
+ let delete_files_to_apply_for_seq_0 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(0));
+ assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
+
+ // All deletes apply to sequence 3
+ let delete_files_to_apply_for_seq_3 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(3));
+ assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
+
+ // Last 3 deletes apply to sequence 4
+ let delete_files_to_apply_for_seq_4 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(4));
+ let actual_paths_to_apply_for_seq_4: Vec<String> =
delete_files_to_apply_for_seq_4
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+
+ assert_eq!(
+ actual_paths_to_apply_for_seq_4,
+ delete_file_paths[delete_file_paths.len() - 3..]
+ );
+
+ // Last 3 deletes apply to sequence 5
+ let delete_files_to_apply_for_seq_5 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(5));
+ let actual_paths_to_apply_for_seq_5: Vec<String> =
delete_files_to_apply_for_seq_5
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_seq_5,
+ delete_file_paths[delete_file_paths.len() - 3..]
+ );
+
+ // Only the last position delete applies to sequence 6
+ let delete_files_to_apply_for_seq_6 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(6));
+ let actual_paths_to_apply_for_seq_6: Vec<String> =
delete_files_to_apply_for_seq_6
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_seq_6,
+ delete_file_paths[delete_file_paths.len() - 1..]
+ );
+
+ // The 2 global equality deletes should match against any partitioned
file
+ let partitioned_file =
+
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 1);
+
+ let delete_files_to_apply_for_partitioned_file =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(0));
+ let actual_paths_to_apply_for_partitioned_file: Vec<String> =
+ delete_files_to_apply_for_partitioned_file
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_partitioned_file,
+ delete_file_paths[..2]
+ );
+ }
+
+ #[test]
+ fn test_delete_file_index_partitioned() {
+ let partition_one = Struct::from_iter([Some(Literal::long(100))]);
+ let spec_id = 1;
+ let deletes: Vec<ManifestEntry> = vec![
+ build_added_manifest_entry(4,
&build_partitioned_eq_delete(&partition_one, spec_id)),
+ build_added_manifest_entry(6,
&build_partitioned_eq_delete(&partition_one, spec_id)),
+ build_added_manifest_entry(5,
&build_partitioned_pos_delete(&partition_one, spec_id)),
+ build_added_manifest_entry(6,
&build_partitioned_pos_delete(&partition_one, spec_id)),
+ ];
+
+ let delete_file_paths: Vec<String> = deletes
+ .iter()
+ .map(|file| file.file_path().to_string())
+ .collect();
+
+ let delete_contexts: Vec<DeleteFileContext> = deletes
+ .into_iter()
+ .map(|entry| DeleteFileContext {
+ manifest_entry: entry.into(),
+ partition_spec_id: spec_id,
+ })
+ .collect();
+
+ let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
+
+ let partitioned_file =
+
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]),
spec_id);
+
+ // All deletes apply to sequence 0
+ let delete_files_to_apply_for_seq_0 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(0));
+ assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
+
+ // All deletes apply to sequence 3
+ let delete_files_to_apply_for_seq_3 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(3));
+ assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
+
+ // Last 3 deletes apply to sequence 4
+ let delete_files_to_apply_for_seq_4 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(4));
+ let actual_paths_to_apply_for_seq_4: Vec<String> =
delete_files_to_apply_for_seq_4
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+
+ assert_eq!(
+ actual_paths_to_apply_for_seq_4,
+ delete_file_paths[delete_file_paths.len() - 3..]
+ );
+
+ // Last 3 deletes apply to sequence 5
+ let delete_files_to_apply_for_seq_5 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(5));
+ let actual_paths_to_apply_for_seq_5: Vec<String> =
delete_files_to_apply_for_seq_5
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_seq_5,
+ delete_file_paths[delete_file_paths.len() - 3..]
+ );
+
+ // Only the last position delete applies to sequence 6
+ let delete_files_to_apply_for_seq_6 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(6));
+ let actual_paths_to_apply_for_seq_6: Vec<String> =
delete_files_to_apply_for_seq_6
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_seq_6,
+ delete_file_paths[delete_file_paths.len() - 1..]
+ );
+
+ // Data file with different partition tuples does not match any delete
files
+ let partitioned_second_file =
+
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(200))]), 1);
+ let delete_files_to_apply_for_different_partition =
+
delete_file_index.get_deletes_for_data_file(&partitioned_second_file, Some(0));
+ let actual_paths_to_apply_for_different_partition: Vec<String> =
+ delete_files_to_apply_for_different_partition
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert!(actual_paths_to_apply_for_different_partition.is_empty());
+
+ // Data file with same tuple but different spec ID does not match any
delete files
+ let partitioned_different_spec =
build_partitioned_data_file(&partition_one, 2);
+ let delete_files_to_apply_for_different_spec =
+
delete_file_index.get_deletes_for_data_file(&partitioned_different_spec,
Some(0));
+ let actual_paths_to_apply_for_different_spec: Vec<String> =
+ delete_files_to_apply_for_different_spec
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert!(actual_paths_to_apply_for_different_spec.is_empty());
+ }
+
+ fn build_unpartitioned_eq_delete() -> DataFile {
+ build_partitioned_eq_delete(&Struct::empty(), 0)
+ }
+
+ fn build_partitioned_eq_delete(partition: &Struct, spec_id: i32) ->
DataFile {
+ DataFileBuilder::default()
+ .file_path(format!("{}_equality_delete.parquet", Uuid::new_v4()))
+ .file_format(DataFileFormat::Parquet)
+ .content(DataContentType::EqualityDeletes)
+ .equality_ids(Some(vec![1]))
+ .record_count(1)
+ .partition(partition.clone())
+ .partition_spec_id(spec_id)
+ .file_size_in_bytes(100)
+ .build()
+ .unwrap()
+ }
+
+ fn build_unpartitioned_pos_delete() -> DataFile {
+ build_partitioned_pos_delete(&Struct::empty(), 0)
+ }
+
+ fn build_partitioned_pos_delete(partition: &Struct, spec_id: i32) ->
DataFile {
+ DataFileBuilder::default()
+ .file_path(format!("{}-pos-delete.parquet", Uuid::new_v4()))
+ .file_format(DataFileFormat::Parquet)
+ .content(DataContentType::PositionDeletes)
+ .record_count(1)
+ .referenced_data_file(Some("/some-data-file.parquet".to_string()))
+ .partition(partition.clone())
+ .partition_spec_id(spec_id)
+ .file_size_in_bytes(100)
+ .build()
+ .unwrap()
+ }
+
+ fn build_unpartitioned_data_file() -> DataFile {
+ DataFileBuilder::default()
+ .file_path(format!("{}-data.parquet", Uuid::new_v4()))
+ .file_format(DataFileFormat::Parquet)
+ .content(DataContentType::Data)
+ .record_count(100)
+ .partition(Struct::empty())
+ .partition_spec_id(0)
+ .file_size_in_bytes(100)
+ .build()
+ .unwrap()
+ }
+
+ fn build_partitioned_data_file(partition_value: &Struct, spec_id: i32) ->
DataFile {
+ DataFileBuilder::default()
+ .file_path(format!("{}-data.parquet", Uuid::new_v4()))
+ .file_format(DataFileFormat::Parquet)
+ .content(DataContentType::Data)
+ .record_count(100)
+ .partition(partition_value.clone())
+ .partition_spec_id(spec_id)
+ .file_size_in_bytes(100)
+ .build()
+ .unwrap()
+ }
+
+ fn build_added_manifest_entry(data_seq_number: i64, file: &DataFile) ->
ManifestEntry {
+ ManifestEntry::builder()
+ .status(ManifestStatus::Added)
+ .sequence_number(data_seq_number)
+ .data_file(file.clone())
+ .build()
+ }
+}