amogh-jahagirdar commented on code in PR #1758:
URL: https://github.com/apache/iceberg-rust/pull/1758#discussion_r2443530169


##########
crates/iceberg/src/delete_file_index.rs:
##########
@@ -201,10 +203,282 @@ impl PopulatedDeleteFileIndex {
                     seq_num
                         .map(|seq_num| delete.manifest_entry.sequence_number() 
>= Some(seq_num))
                         .unwrap_or_else(|| true)
+                        && data_file.partition_spec_id == 
delete.partition_spec_id
                 })
                 .for_each(|delete| results.push(delete.as_ref().into()));
         }
 
         results
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use uuid::Uuid;
+
+    use super::*;
+    use crate::spec::{
+        DataContentType, DataFileBuilder, DataFileFormat, Literal, 
ManifestEntry, ManifestStatus,
+        Struct,
+    };
+
+    #[test]
+    fn test_delete_file_index_unpartitioned() {
+        let deletes: Vec<ManifestEntry> = vec![
+            build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
+            build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
+            build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
+            build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
+        ];

Review Comment:
   Yeah exactly, it's to exercise the path that prompted this fix 



##########
crates/iceberg/src/delete_file_index.rs:
##########
@@ -201,10 +203,282 @@ impl PopulatedDeleteFileIndex {
                     seq_num
                         .map(|seq_num| delete.manifest_entry.sequence_number() 
>= Some(seq_num))
                         .unwrap_or_else(|| true)
+                        && data_file.partition_spec_id == 
delete.partition_spec_id
                 })
                 .for_each(|delete| results.push(delete.as_ref().into()));
         }
 
         results
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use uuid::Uuid;
+
+    use super::*;
+    use crate::spec::{
+        DataContentType, DataFileBuilder, DataFileFormat, Literal, 
ManifestEntry, ManifestStatus,
+        Struct,
+    };
+
+    #[test]
+    fn test_delete_file_index_unpartitioned() {
+        let deletes: Vec<ManifestEntry> = vec![
+            build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
+            build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
+            build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
+            build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
+        ];
+
+        let delete_file_paths: Vec<String> = deletes
+            .iter()
+            .map(|file| file.file_path().to_string())
+            .collect();
+
+        let delete_contexts: Vec<DeleteFileContext> = deletes
+            .into_iter()
+            .map(|entry| DeleteFileContext {
+                manifest_entry: entry.into(),
+                partition_spec_id: 0,
+            })
+            .collect();
+
+        let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
+
+        let data_file = build_unpartitioned_data_file();
+
+        // All deletes apply to sequence 0
+        let delete_files_to_apply_for_seq_0 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(0));
+        assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
+
+        // All deletes apply to sequence 3
+        let delete_files_to_apply_for_seq_3 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(3));
+        assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
+
+        // Last 3 deletes apply to sequence 4
+        let delete_files_to_apply_for_seq_4 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(4));
+        let actual_paths_to_apply_for_seq_4: Vec<String> = 
delete_files_to_apply_for_seq_4
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+
+        assert_eq!(
+            actual_paths_to_apply_for_seq_4,
+            delete_file_paths[delete_file_paths.len() - 3..]
+        );
+
+        // Last 3 deletes apply to sequence 5
+        let delete_files_to_apply_for_seq_5 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(5));
+        let actual_paths_to_apply_for_seq_5: Vec<String> = 
delete_files_to_apply_for_seq_5
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_seq_5,
+            delete_file_paths[delete_file_paths.len() - 3..]
+        );
+
+        // Only the last position delete applies to sequence 6
+        let delete_files_to_apply_for_seq_6 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(6));
+        let actual_paths_to_apply_for_seq_6: Vec<String> = 
delete_files_to_apply_for_seq_6
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_seq_6,
+            delete_file_paths[delete_file_paths.len() - 1..]
+        );
+
+        // The 2 global equality deletes should match against any partitioned 
file
+        let partitioned_file =
+            
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 1);
+
+        let delete_files_to_apply_for_partitioned_file =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(0));
+        let actual_paths_to_apply_for_partitioned_file: Vec<String> =
+            delete_files_to_apply_for_partitioned_file
+                .into_iter()
+                .map(|file| file.file_path)
+                .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_partitioned_file,
+            delete_file_paths[..2]
+        );
+    }
+
+    #[test]
+    fn test_delete_file_index_partitioned() {
+        let partition_one = Struct::from_iter([Some(Literal::long(100))]);
+        let spec_id = 1;
+        let deletes: Vec<ManifestEntry> = vec![
+            build_added_manifest_entry(4, 
&build_partitioned_eq_delete(&partition_one, spec_id)),
+            build_added_manifest_entry(6, 
&build_partitioned_eq_delete(&partition_one, spec_id)),
+            build_added_manifest_entry(5, 
&build_partitioned_pos_delete(&partition_one, spec_id)),
+            build_added_manifest_entry(6, 
&build_partitioned_pos_delete(&partition_one, spec_id)),
+        ];
+
+        let delete_file_paths: Vec<String> = deletes
+            .iter()
+            .map(|file| file.file_path().to_string())
+            .collect();
+
+        let delete_contexts: Vec<DeleteFileContext> = deletes
+            .into_iter()
+            .map(|entry| DeleteFileContext {
+                manifest_entry: entry.into(),
+                partition_spec_id: spec_id,
+            })
+            .collect();
+
+        let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
+
+        let partitioned_file =
+            
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 
spec_id);
+
+        // All deletes apply to sequence 0
+        let delete_files_to_apply_for_seq_0 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(0));
+        assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
+
+        // All deletes apply to sequence 3
+        let delete_files_to_apply_for_seq_3 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(3));
+        assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
+
+        // Last 3 deletes apply to sequence 4
+        let delete_files_to_apply_for_seq_4 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(4));
+        let actual_paths_to_apply_for_seq_4: Vec<String> = 
delete_files_to_apply_for_seq_4
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+
+        assert_eq!(
+            actual_paths_to_apply_for_seq_4,
+            delete_file_paths[delete_file_paths.len() - 3..]
+        );
+
+        // Last 3 deletes apply to sequence 5
+        let delete_files_to_apply_for_seq_5 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(5));
+        let actual_paths_to_apply_for_seq_5: Vec<String> = 
delete_files_to_apply_for_seq_5
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_seq_5,
+            delete_file_paths[delete_file_paths.len() - 3..]
+        );
+
+        // Only the last position delete applies to sequence 6
+        let delete_files_to_apply_for_seq_6 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(6));
+        let actual_paths_to_apply_for_seq_6: Vec<String> = 
delete_files_to_apply_for_seq_6
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_seq_6,
+            delete_file_paths[delete_file_paths.len() - 1..]
+        );
+
+        // Data file with different partition tuples does not match any delete 
files
+        let partitioned_second_file =
+            
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(200))]), 1);
+        let delete_files_to_apply_for_different_partition =
+            
delete_file_index.get_deletes_for_data_file(&partitioned_second_file, Some(0));
+        let actual_paths_to_apply_for_different_partition: Vec<String> =
+            delete_files_to_apply_for_different_partition
+                .into_iter()
+                .map(|file| file.file_path)
+                .collect();
+        assert!(actual_paths_to_apply_for_different_partition.is_empty());
+
+        // Data file with same tuple but different spec ID does not match any 
delete files
+        let partitioned_different_spec = 
build_partitioned_data_file(&partition_one, 2);
+        let delete_files_to_apply_for_different_spec =
+            
delete_file_index.get_deletes_for_data_file(&partitioned_different_spec, 
Some(0));
+        let actual_paths_to_apply_for_different_spec: Vec<String> =
+            delete_files_to_apply_for_different_spec
+                .into_iter()
+                .map(|file| file.file_path)
+                .collect();
+        assert!(actual_paths_to_apply_for_different_spec.is_empty());
+    }
+
+    fn build_unpartitioned_eq_delete() -> DataFile {
+        build_partitioned_eq_delete(&Struct::empty(), 0)
+    }
+
+    fn build_partitioned_eq_delete(partition: &Struct, spec_id: i32) -> 
DataFile {
+        DataFileBuilder::default()
+            .file_path(format!("{}_equality_delete.parquet", Uuid::new_v4()))
+            .file_format(DataFileFormat::Parquet)
+            .content(DataContentType::EqualityDeletes)
+            .equality_ids(Some(vec![1]))
+            .record_count(1)
+            .partition(partition.clone())
+            .partition_spec_id(spec_id)
+            .file_size_in_bytes(100)
+            .build()
+            .unwrap()
+    }
+
+    fn build_unpartitioned_pos_delete() -> DataFile {
+        build_partitioned_pos_delete(&Struct::empty(), 0)
+    }
+
+    fn build_partitioned_pos_delete(partition: &Struct, spec_id: i32) -> 
DataFile {
+        DataFileBuilder::default()
+            .file_path(format!("{}-dv.puffin", Uuid::new_v4()))
+            .file_format(DataFileFormat::Puffin)
+            .content(DataContentType::PositionDeletes)

Review Comment:
   Good point, yeah. We'll want to add the DV validation once the DV matching 
is added (matching against the actual data file location) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to