This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new fd089164 fix(reader): Support both position and equality delete files 
on the same FileScanTask (#1778)
fd089164 is described below

commit fd089164bff19fdd4b0d297363fc7808b6f6ea18
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Nov 4 20:41:29 2025 -0500

    fix(reader): Support both position and equality delete files on the same 
FileScanTask (#1778)
    
    ## What issue does this PR close?
    
    Partially address #1749.
    
    ## Rationale for this change
    
    This PR fixes a bug in delete file loading when a `FileScanTask`
    contains both positional and equality delete files. We hit this when
    running Iceberg Java test suite via Comet in
    https://github.com/apache/datafusion-comet/pull/2528. The tests that
    failed were
    ```
    TestSparkExecutorCache > testMergeOnReadUpdate()
    TestSparkExecutorCache > testMergeOnReadMerge()
    TestSparkExecutorCache > testMergeOnReadDelete()
    ```
    
    **The Bug:**
    The condition in `try_start_eq_del_load` (delete_filter.rs:71-73) was
    inverted. It returned `None` when the equality delete file was not in
    the cache, causing the loader to skip loading it. When
    `build_equality_delete_predicate` was later called, it would fail with
    "Missing predicate for equality delete file".
    
    ## What changes are included in this PR?
    
    **The Fix:**
    - Inverted the condition so it returns `None` when the file is already
    in the cache (being loaded or loaded), preventing duplicate work across
    concurrent tasks
    - When the file is not in the cache, mark it as Loading and proceed with
    loading
    
    **Additional Changes:**
    - Added test case `test_load_deletes_with_mixed_types` that reproduces
    the bug scenario
    
    ## Are these changes tested?
    
    Yes, this PR includes a new unit test
    `test_load_deletes_with_mixed_types` that:
    - Creates a `FileScanTask` with both a positional delete file and an
    equality delete file
    - Verifies that `load_deletes` successfully processes both types
    - Verifies that `build_equality_delete_predicate` succeeds without the
    "Missing predicate" error
    - We hit this when running Iceberg Java test suite via Comet in
    https://github.com/apache/datafusion-comet/pull/2528. I also confirmed
    that it fixes the tests in Iceberg Java's suite.
    
    The test would fail before this fix and passes after.
---
 .../src/arrow/caching_delete_file_loader.rs        | 113 +++++++++++++++++++++
 crates/iceberg/src/arrow/delete_filter.rs          |   4 +-
 2 files changed, 116 insertions(+), 1 deletion(-)

diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs 
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 078635c9..8a3ab3a9 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -788,4 +788,117 @@ mod tests {
         assert_eq!(data_col.value(1), "d");
         assert_eq!(data_col.value(2), "g");
     }
+
+    /// Test loading a FileScanTask with BOTH positional and equality deletes.
+    /// Verifies the fix for the inverted condition that caused "Missing 
predicate for equality delete file" errors.
+    #[tokio::test]
+    async fn test_load_deletes_with_mixed_types() {
+        use crate::scan::FileScanTask;
+        use crate::spec::{DataFileFormat, Schema};
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path();
+        let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+            .unwrap()
+            .build()
+            .unwrap();
+
+        // Create the data file schema
+        let data_file_schema = Arc::new(
+            Schema::builder()
+                .with_fields(vec![
+                    crate::spec::NestedField::optional(
+                        2,
+                        "y",
+                        
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
+                    )
+                    .into(),
+                    crate::spec::NestedField::optional(
+                        3,
+                        "z",
+                        
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
+                    )
+                    .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Write positional delete file
+        let positional_delete_schema = 
crate::arrow::delete_filter::tests::create_pos_del_schema();
+        let file_path_values =
+            vec![format!("{}/data-1.parquet", 
table_location.to_str().unwrap()); 4];
+        let file_path_col = 
Arc::new(StringArray::from_iter_values(&file_path_values));
+        let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 
3]));
+
+        let positional_deletes_to_write =
+            RecordBatch::try_new(positional_delete_schema.clone(), vec![
+                file_path_col,
+                pos_col,
+            ])
+            .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let pos_del_path = format!("{}/pos-del-mixed.parquet", 
table_location.to_str().unwrap());
+        let file = File::create(&pos_del_path).unwrap();
+        let mut writer = ArrowWriter::try_new(
+            file,
+            positional_deletes_to_write.schema(),
+            Some(props.clone()),
+        )
+        .unwrap();
+        writer.write(&positional_deletes_to_write).unwrap();
+        writer.close().unwrap();
+
+        // Write equality delete file
+        let eq_delete_path = 
setup_write_equality_delete_file_1(table_location.to_str().unwrap());
+
+        // Create FileScanTask with BOTH positional and equality deletes
+        let pos_del = FileScanTaskDeleteFile {
+            file_path: pos_del_path,
+            file_type: DataContentType::PositionDeletes,
+            partition_spec_id: 0,
+            equality_ids: None,
+        };
+
+        let eq_del = FileScanTaskDeleteFile {
+            file_path: eq_delete_path.clone(),
+            file_type: DataContentType::EqualityDeletes,
+            partition_spec_id: 0,
+            equality_ids: Some(vec![2, 3]), // Only use field IDs that exist 
in both schemas
+        };
+
+        let file_scan_task = FileScanTask {
+            start: 0,
+            length: 0,
+            record_count: None,
+            data_file_path: format!("{}/data-1.parquet", 
table_location.to_str().unwrap()),
+            data_file_format: DataFileFormat::Parquet,
+            schema: data_file_schema.clone(),
+            project_field_ids: vec![2, 3],
+            predicate: None,
+            deletes: vec![pos_del, eq_del],
+        };
+
+        // Load the deletes - should handle both types without error
+        let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 
10);
+        let delete_filter = delete_file_loader
+            .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
+            .await
+            .unwrap()
+            .unwrap();
+
+        // Verify both delete types can be processed together
+        let result = delete_filter
+            .build_equality_delete_predicate(&file_scan_task)
+            .await;
+        assert!(
+            result.is_ok(),
+            "Failed to build equality delete predicate: {:?}",
+            result.err()
+        );
+    }
 }
diff --git a/crates/iceberg/src/arrow/delete_filter.rs 
b/crates/iceberg/src/arrow/delete_filter.rs
index b853baa9..4250974b 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -68,10 +68,12 @@ impl DeleteFilter {
     pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> 
Option<Arc<Notify>> {
         let mut state = self.state.write().unwrap();
 
-        if !state.equality_deletes.contains_key(file_path) {
+        // Skip if already loaded/loading - another task owns it
+        if state.equality_deletes.contains_key(file_path) {
             return None;
         }
 
+        // Mark as loading to prevent duplicate work
         let notifier = Arc::new(Notify::new());
         state
             .equality_deletes

Reply via email to