This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 835f528f fix(reader): Equality delete files with partial schemas 
(containing only equality columns) (#1782)
835f528f is described below

commit 835f528f4dd1d700158ce5339e14bc1da04626d1
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Nov 4 04:06:05 2025 -0500

    fix(reader): Equality delete files with partial schemas (containing only 
equality columns) (#1782)
    
    ## What issue does this PR close?
    
    Partially address #1749.
    
    ## Rationale for this change
    
    Equality delete files with partial schemas (containing only equality
    columns) were hitting Arrow validation errors: "Column 'id' is declared
    as non-nullable but contains null values” in Iceberg Java’s
    TestSparkReaderDeletes suite. The bug occurs because `evolve_schema()`
    adds missing columns with NULL values, which fails when those columns
    are declared REQUIRED in the table schema.
    
    ## What changes are included in this PR?
    
    Change the `evolve_schema()` call to take `equality_ids` because per the
    Iceberg spec, evolve schema for equality deletes but only for the
    `equality_ids` columns, not all table columns.
    
    ## Are these changes tested?
    
    `test_partial_schema_equality_deletes_evolve_succeeds`
---
 .../src/arrow/caching_delete_file_loader.rs        | 118 +++++++++++++++++++--
 crates/iceberg/src/arrow/delete_file_loader.rs     |  24 +++--
 2 files changed, 124 insertions(+), 18 deletions(-)

diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs 
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 77f29b7f..078635c9 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -224,16 +224,22 @@ impl CachingDeleteFileLoader {
                 let (sender, receiver) = channel();
                 del_filter.insert_equality_delete(&task.file_path, receiver);
 
+                // Per the Iceberg spec, evolve schema for equality deletes 
but only for the
+                // equality_ids columns, not all table columns.
+                let equality_ids_vec = task.equality_ids.clone().unwrap();
+                let evolved_stream = BasicDeleteFileLoader::evolve_schema(
+                    basic_delete_file_loader
+                        .parquet_to_batch_stream(&task.file_path)
+                        .await?,
+                    schema,
+                    &equality_ids_vec,
+                )
+                .await?;
+
                 Ok(DeleteFileContext::FreshEqDel {
-                    batch_stream: BasicDeleteFileLoader::evolve_schema(
-                        basic_delete_file_loader
-                            .parquet_to_batch_stream(&task.file_path)
-                            .await?,
-                        schema,
-                    )
-                    .await?,
+                    batch_stream: evolved_stream,
                     sender,
-                    equality_ids: 
HashSet::from_iter(task.equality_ids.clone().unwrap()),
+                    equality_ids: HashSet::from_iter(equality_ids_vec),
                 })
             }
 
@@ -536,6 +542,7 @@ mod tests {
     use std::fs::File;
     use std::sync::Arc;
 
+    use arrow_array::cast::AsArray;
     use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, 
StringArray, StructArray};
     use arrow_schema::{DataType, Field, Fields};
     use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
@@ -686,4 +693,99 @@ mod tests {
         let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
         assert!(result.is_none()); // no pos dels for file 3
     }
+
+    /// Verifies that evolve_schema on partial-schema equality deletes works 
correctly
+    /// when only equality_ids columns are evolved, not all table columns.
+    ///
+    /// Per the [Iceberg 
spec](https://iceberg.apache.org/spec/#equality-delete-files),
+    /// equality delete files can contain only a subset of columns.
+    #[tokio::test]
+    async fn test_partial_schema_equality_deletes_evolve_succeeds() {
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
+
+        // Create table schema with REQUIRED fields
+        let table_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    crate::spec::NestedField::required(
+                        1,
+                        "id",
+                        
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int),
+                    )
+                    .into(),
+                    crate::spec::NestedField::required(
+                        2,
+                        "data",
+                        
crate::spec::Type::Primitive(crate::spec::PrimitiveType::String),
+                    )
+                    .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Write equality delete file with PARTIAL schema (only 'data' column)
+        let delete_file_path = {
+            let data_vals = vec!["a", "d", "g"];
+            let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef;
+
+            let delete_schema = 
Arc::new(arrow_schema::Schema::new(vec![simple_field(
+                "data",
+                DataType::Utf8,
+                false,
+                "2", // field ID
+            )]));
+
+            let delete_batch = RecordBatch::try_new(delete_schema.clone(), 
vec![data_col]).unwrap();
+
+            let path = format!("{}/partial-eq-deletes.parquet", 
&table_location);
+            let file = File::create(&path).unwrap();
+            let props = WriterProperties::builder()
+                .set_compression(Compression::SNAPPY)
+                .build();
+            let mut writer =
+                ArrowWriter::try_new(file, delete_batch.schema(), 
Some(props)).unwrap();
+            writer.write(&delete_batch).expect("Writing batch");
+            writer.close().unwrap();
+            path
+        };
+
+        let file_io = 
FileIO::from_path(table_location).unwrap().build().unwrap();
+        let basic_delete_file_loader = 
BasicDeleteFileLoader::new(file_io.clone());
+
+        let batch_stream = basic_delete_file_loader
+            .parquet_to_batch_stream(&delete_file_path)
+            .await
+            .unwrap();
+
+        // Only evolve the equality_ids columns (field 2), not all table 
columns
+        let equality_ids = vec![2];
+        let evolved_stream =
+            BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema, 
&equality_ids)
+                .await
+                .unwrap();
+
+        let result = evolved_stream.try_collect::<Vec<_>>().await;
+
+        assert!(
+            result.is_ok(),
+            "Expected success when evolving only equality_ids columns, got 
error: {:?}",
+            result.err()
+        );
+
+        let batches = result.unwrap();
+        assert_eq!(batches.len(), 1);
+
+        let batch = &batches[0];
+        assert_eq!(batch.num_rows(), 3);
+        assert_eq!(batch.num_columns(), 1); // Only 'data' column
+
+        // Verify the actual values are preserved after schema evolution
+        let data_col = batch.column(0).as_string::<i32>();
+        assert_eq!(data_col.value(0), "a");
+        assert_eq!(data_col.value(1), "d");
+        assert_eq!(data_col.value(2), "g");
+    }
 }
diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs 
b/crates/iceberg/src/arrow/delete_file_loader.rs
index f2c63cc5..c0b1392d 100644
--- a/crates/iceberg/src/arrow/delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/delete_file_loader.rs
@@ -72,20 +72,17 @@ impl BasicDeleteFileLoader {
         Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
     }
 
-    /// Evolves the schema of the RecordBatches from an equality delete file
+    /// Evolves the schema of the RecordBatches from an equality delete file.
+    ///
+    /// Per the [Iceberg 
spec](https://iceberg.apache.org/spec/#equality-delete-files),
+    /// only evolves the specified `equality_ids` columns, not all table 
columns.
     pub(crate) async fn evolve_schema(
         record_batch_stream: ArrowRecordBatchStream,
         target_schema: Arc<Schema>,
+        equality_ids: &[i32],
     ) -> Result<ArrowRecordBatchStream> {
-        let eq_ids = target_schema
-            .as_ref()
-            .field_id_to_name_map()
-            .keys()
-            .cloned()
-            .collect::<Vec<_>>();
-
         let mut record_batch_transformer =
-            RecordBatchTransformer::build(target_schema.clone(), &eq_ids);
+            RecordBatchTransformer::build(target_schema.clone(), equality_ids);
 
         let record_batch_stream = record_batch_stream.map(move |record_batch| {
             record_batch.and_then(|record_batch| {
@@ -106,7 +103,14 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
     ) -> Result<ArrowRecordBatchStream> {
         let raw_batch_stream = 
self.parquet_to_batch_stream(&task.file_path).await?;
 
-        Self::evolve_schema(raw_batch_stream, schema).await
+        // For equality deletes, only evolve the equality_ids columns.
+        // For positional deletes (equality_ids is None), use all field IDs.
+        let field_ids = match &task.equality_ids {
+            Some(ids) => ids.clone(),
+            None => schema.field_id_to_name_map().keys().cloned().collect(),
+        };
+
+        Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
     }
 }
 

Reply via email to