kevinjqliu commented on code in PR #1758:
URL: https://github.com/apache/iceberg-rust/pull/1758#discussion_r2443490900
##########
crates/iceberg/src/delete_file_index.rs:
##########
@@ -201,10 +203,282 @@ impl PopulatedDeleteFileIndex {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number()
>= Some(seq_num))
.unwrap_or_else(|| true)
+ && data_file.partition_spec_id ==
delete.partition_spec_id
Review Comment:

👍
partition value match is handled by
`self.pos_deletes_by_partition.get(data_file.partition()) `
##########
crates/iceberg/src/delete_file_index.rs:
##########
@@ -201,10 +203,282 @@ impl PopulatedDeleteFileIndex {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number()
>= Some(seq_num))
.unwrap_or_else(|| true)
+ && data_file.partition_spec_id ==
delete.partition_spec_id
})
.for_each(|delete| results.push(delete.as_ref().into()));
}
results
}
}
+
+#[cfg(test)]
+mod tests {
+ use uuid::Uuid;
+
+ use super::*;
+ use crate::spec::{
+ DataContentType, DataFileBuilder, DataFileFormat, Literal,
ManifestEntry, ManifestStatus,
+ Struct,
+ };
+
+ #[test]
+ fn test_delete_file_index_unpartitioned() {
+ let deletes: Vec<ManifestEntry> = vec![
+ build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
+ build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
+ build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
+ build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
+ ];
Review Comment:
```suggestion
build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
];
```
nit for readability, feel free to ignore
##########
crates/iceberg/src/delete_file_index.rs:
##########
@@ -201,10 +203,282 @@ impl PopulatedDeleteFileIndex {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number()
>= Some(seq_num))
.unwrap_or_else(|| true)
+ && data_file.partition_spec_id ==
delete.partition_spec_id
})
.for_each(|delete| results.push(delete.as_ref().into()));
}
results
}
}
+
+#[cfg(test)]
+mod tests {
+ use uuid::Uuid;
+
+ use super::*;
+ use crate::spec::{
+ DataContentType, DataFileBuilder, DataFileFormat, Literal,
ManifestEntry, ManifestStatus,
+ Struct,
+ };
+
+ #[test]
+ fn test_delete_file_index_unpartitioned() {
+ let deletes: Vec<ManifestEntry> = vec![
+ build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
+ build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
+ build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
+ build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
+ ];
Review Comment:
oh haha, the first 2 being equality deletes is used by
`delete_file_paths[..2]`
##########
crates/iceberg/src/delete_file_index.rs:
##########
Review Comment:
```suggestion
/// it is added to the `global_equality_deletes` vector
```
s/global_deletes/global_equality_deletes
##########
crates/iceberg/src/delete_file_index.rs:
##########
@@ -201,10 +203,282 @@ impl PopulatedDeleteFileIndex {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number()
>= Some(seq_num))
.unwrap_or_else(|| true)
+ && data_file.partition_spec_id ==
delete.partition_spec_id
})
.for_each(|delete| results.push(delete.as_ref().into()));
}
results
}
}
+
+#[cfg(test)]
+mod tests {
+ use uuid::Uuid;
+
+ use super::*;
+ use crate::spec::{
+ DataContentType, DataFileBuilder, DataFileFormat, Literal,
ManifestEntry, ManifestStatus,
+ Struct,
+ };
+
+ #[test]
+ fn test_delete_file_index_unpartitioned() {
+ let deletes: Vec<ManifestEntry> = vec![
+ build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
+ build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
+ build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
+ build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
+ ];
+
+ let delete_file_paths: Vec<String> = deletes
+ .iter()
+ .map(|file| file.file_path().to_string())
+ .collect();
+
+ let delete_contexts: Vec<DeleteFileContext> = deletes
+ .into_iter()
+ .map(|entry| DeleteFileContext {
+ manifest_entry: entry.into(),
+ partition_spec_id: 0,
+ })
+ .collect();
+
+ let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
+
+ let data_file = build_unpartitioned_data_file();
+
+ // All deletes apply to sequence 0
+ let delete_files_to_apply_for_seq_0 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(0));
+ assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
+
+ // All deletes apply to sequence 3
+ let delete_files_to_apply_for_seq_3 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(3));
+ assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
+
+ // Last 3 deletes apply to sequence 4
+ let delete_files_to_apply_for_seq_4 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(4));
+ let actual_paths_to_apply_for_seq_4: Vec<String> =
delete_files_to_apply_for_seq_4
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+
+ assert_eq!(
+ actual_paths_to_apply_for_seq_4,
+ delete_file_paths[delete_file_paths.len() - 3..]
+ );
+
+ // Last 3 deletes apply to sequence 5
+ let delete_files_to_apply_for_seq_5 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(5));
+ let actual_paths_to_apply_for_seq_5: Vec<String> =
delete_files_to_apply_for_seq_5
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_seq_5,
+ delete_file_paths[delete_file_paths.len() - 3..]
+ );
+
+ // Only the last position delete applies to sequence 6
+ let delete_files_to_apply_for_seq_6 =
+ delete_file_index.get_deletes_for_data_file(&data_file, Some(6));
+ let actual_paths_to_apply_for_seq_6: Vec<String> =
delete_files_to_apply_for_seq_6
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_seq_6,
+ delete_file_paths[delete_file_paths.len() - 1..]
+ );
+
+ // The 2 global equality deletes should match against any partitioned
file
+ let partitioned_file =
+
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 1);
+
+ let delete_files_to_apply_for_partitioned_file =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(0));
+ let actual_paths_to_apply_for_partitioned_file: Vec<String> =
+ delete_files_to_apply_for_partitioned_file
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_partitioned_file,
+ delete_file_paths[..2]
+ );
+ }
+
+ #[test]
+ fn test_delete_file_index_partitioned() {
+ let partition_one = Struct::from_iter([Some(Literal::long(100))]);
+ let spec_id = 1;
+ let deletes: Vec<ManifestEntry> = vec![
+ build_added_manifest_entry(4,
&build_partitioned_eq_delete(&partition_one, spec_id)),
+ build_added_manifest_entry(6,
&build_partitioned_eq_delete(&partition_one, spec_id)),
+ build_added_manifest_entry(5,
&build_partitioned_pos_delete(&partition_one, spec_id)),
+ build_added_manifest_entry(6,
&build_partitioned_pos_delete(&partition_one, spec_id)),
+ ];
+
+ let delete_file_paths: Vec<String> = deletes
+ .iter()
+ .map(|file| file.file_path().to_string())
+ .collect();
+
+ let delete_contexts: Vec<DeleteFileContext> = deletes
+ .into_iter()
+ .map(|entry| DeleteFileContext {
+ manifest_entry: entry.into(),
+ partition_spec_id: spec_id,
+ })
+ .collect();
+
+ let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
+
+ let partitioned_file =
+
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]),
spec_id);
+
+ // All deletes apply to sequence 0
+ let delete_files_to_apply_for_seq_0 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(0));
+ assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
+
+ // All deletes apply to sequence 3
+ let delete_files_to_apply_for_seq_3 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(3));
+ assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
+
+ // Last 3 deletes apply to sequence 4
+ let delete_files_to_apply_for_seq_4 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(4));
+ let actual_paths_to_apply_for_seq_4: Vec<String> =
delete_files_to_apply_for_seq_4
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+
+ assert_eq!(
+ actual_paths_to_apply_for_seq_4,
+ delete_file_paths[delete_file_paths.len() - 3..]
+ );
+
+ // Last 3 deletes apply to sequence 5
+ let delete_files_to_apply_for_seq_5 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(5));
+ let actual_paths_to_apply_for_seq_5: Vec<String> =
delete_files_to_apply_for_seq_5
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_seq_5,
+ delete_file_paths[delete_file_paths.len() - 3..]
+ );
+
+ // Only the last position delete applies to sequence 6
+ let delete_files_to_apply_for_seq_6 =
+ delete_file_index.get_deletes_for_data_file(&partitioned_file,
Some(6));
+ let actual_paths_to_apply_for_seq_6: Vec<String> =
delete_files_to_apply_for_seq_6
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert_eq!(
+ actual_paths_to_apply_for_seq_6,
+ delete_file_paths[delete_file_paths.len() - 1..]
+ );
+
+ // Data file with different partition tuples does not match any delete
files
+ let partitioned_second_file =
+
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(200))]), 1);
+ let delete_files_to_apply_for_different_partition =
+
delete_file_index.get_deletes_for_data_file(&partitioned_second_file, Some(0));
+ let actual_paths_to_apply_for_different_partition: Vec<String> =
+ delete_files_to_apply_for_different_partition
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert!(actual_paths_to_apply_for_different_partition.is_empty());
+
+ // Data file with same tuple but different spec ID does not match any
delete files
+ let partitioned_different_spec =
build_partitioned_data_file(&partition_one, 2);
+ let delete_files_to_apply_for_different_spec =
+
delete_file_index.get_deletes_for_data_file(&partitioned_different_spec,
Some(0));
+ let actual_paths_to_apply_for_different_spec: Vec<String> =
+ delete_files_to_apply_for_different_spec
+ .into_iter()
+ .map(|file| file.file_path)
+ .collect();
+ assert!(actual_paths_to_apply_for_different_spec.is_empty());
+ }
+
+ fn build_unpartitioned_eq_delete() -> DataFile {
+ build_partitioned_eq_delete(&Struct::empty(), 0)
+ }
+
+ fn build_partitioned_eq_delete(partition: &Struct, spec_id: i32) ->
DataFile {
+ DataFileBuilder::default()
+ .file_path(format!("{}_equality_delete.parquet", Uuid::new_v4()))
+ .file_format(DataFileFormat::Parquet)
+ .content(DataContentType::EqualityDeletes)
+ .equality_ids(Some(vec![1]))
+ .record_count(1)
+ .partition(partition.clone())
+ .partition_spec_id(spec_id)
+ .file_size_in_bytes(100)
+ .build()
+ .unwrap()
+ }
+
+ fn build_unpartitioned_pos_delete() -> DataFile {
+ build_partitioned_pos_delete(&Struct::empty(), 0)
+ }
+
+ fn build_partitioned_pos_delete(partition: &Struct, spec_id: i32) ->
DataFile {
+ DataFileBuilder::default()
+ .file_path(format!("{}-dv.puffin", Uuid::new_v4()))
+ .file_format(DataFileFormat::Puffin)
+ .content(DataContentType::PositionDeletes)
Review Comment:
nit: should we use position-based delete files instead of puffin (dv) since
we're testing for v2 table behaviors here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]