This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 12c4c211 feat(reader): null struct default values in create_column 
(#1847)
12c4c211 is described below

commit 12c4c2110174b76eb82e584cb7933efdc9afcecc
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Nov 13 05:21:02 2025 -0500

    feat(reader): null struct default values in create_column (#1847)
    
    Fixes
    `TestSparkReaderDeletes.testPosDeletesOnParquetFileWithMultipleRowGroups`
    in Iceberg Java 1.10 with DataFusion Comet.
    
    ## Which issue does this PR close?
    
    
    - Partially address #1749.
    
    ## What changes are included in this PR?
    
    - While `RecordBatchTransformer` does not have exhaustive nested type
    support yet, this adds logic to `create_column` in the specific scenario
    for a schema evolution with a new struct column that uses the default
    NULL value.
    - If the column has a default value other than NULL defined, it will
    fall into the existing match arm and say it is unsupported.
    
    ## Are these changes tested?
    
    
    New test to reflect what happens with Iceberg Java 1.10's
    `TestSparkReaderDeletes.testPosDeletesOnParquetFileWithMultipleRowGroups`.
    The test is misleading, since I figured testing positional deletes would
    just be a delete vector and be schema agnostic, but [it includes schema
    change with binary and struct types so we need default NULL
    
values](https://github.com/apache/iceberg/blob/53c046efda5d6c6ac67caf7de29849ab7ac6d406/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java#L65).
---
 .../iceberg/src/arrow/record_batch_transformer.rs  | 93 ++++++++++++++++++++++
 1 file changed, 93 insertions(+)

diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs 
b/crates/iceberg/src/arrow/record_batch_transformer.rs
index e7d8b8f0..07ec4391 100644
--- a/crates/iceberg/src/arrow/record_batch_transformer.rs
+++ b/crates/iceberg/src/arrow/record_batch_transformer.rs
@@ -21,7 +21,9 @@ use std::sync::Arc;
 use arrow_array::{
     Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, 
Float32Array,
     Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, 
RecordBatchOptions, StringArray,
+    StructArray,
 };
+use arrow_buffer::NullBuffer;
 use arrow_cast::cast;
 use arrow_schema::{
     DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, 
SchemaRef,
@@ -594,6 +596,21 @@ impl RecordBatchTransformer {
                 let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
                 Arc::new(BinaryArray::from_opt_vec(vals))
             }
+            (DataType::Struct(fields), None) => {
+                // Create a StructArray filled with nulls. Per Iceberg spec, 
optional struct fields
+                // default to null when added to the schema. We defer non-null 
default struct values
+                // and leave them as not implemented yet.
+                let null_arrays: Vec<ArrayRef> = fields
+                    .iter()
+                    .map(|field| Self::create_column(field.data_type(), &None, 
num_rows))
+                    .collect::<Result<Vec<_>>>()?;
+
+                Arc::new(StructArray::new(
+                    fields.clone(),
+                    null_arrays,
+                    Some(NullBuffer::new_null(num_rows)),
+                ))
+            }
             (DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
             (dt, _) => {
                 return Err(Error::new(
@@ -743,6 +760,82 @@ mod test {
         assert!(date_column.is_null(2));
     }
 
+    #[test]
+    fn schema_evolution_adds_struct_column_with_nulls() {
+        // Test that when a struct column is added after data files are 
written,
+        // the transformer can materialize the missing struct column with null 
values.
+        // This reproduces the scenario from Iceberg 1.10.0 
TestSparkReaderDeletes tests
+        // where binaryData and structData columns were added to the schema.
+        let snapshot_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "data", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::optional(
+                        3,
+                        "struct_col",
+                        Type::Struct(crate::spec::StructType::new(vec![
+                            NestedField::optional(
+                                100,
+                                "inner_field",
+                                Type::Primitive(PrimitiveType::String),
+                            )
+                            .into(),
+                        ])),
+                    )
+                    .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+        let projected_iceberg_field_ids = [1, 2, 3];
+
+        let mut transformer =
+            RecordBatchTransformer::build(snapshot_schema, 
&projected_iceberg_field_ids);
+
+        let file_schema = Arc::new(ArrowSchema::new(vec![
+            simple_field("id", DataType::Int32, false, "1"),
+            simple_field("data", DataType::Utf8, false, "2"),
+        ]));
+
+        let file_batch = RecordBatch::try_new(file_schema, vec![
+            Arc::new(Int32Array::from(vec![1, 2, 3])),
+            Arc::new(StringArray::from(vec!["a", "b", "c"])),
+        ])
+        .unwrap();
+
+        let result = transformer.process_record_batch(file_batch).unwrap();
+
+        assert_eq!(result.num_columns(), 3);
+        assert_eq!(result.num_rows(), 3);
+
+        let id_column = result
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_column.values(), &[1, 2, 3]);
+
+        let data_column = result
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(data_column.value(0), "a");
+        assert_eq!(data_column.value(1), "b");
+        assert_eq!(data_column.value(2), "c");
+
+        let struct_column = result
+            .column(2)
+            .as_any()
+            .downcast_ref::<arrow_array::StructArray>()
+            .unwrap();
+        assert!(struct_column.is_null(0));
+        assert!(struct_column.is_null(1));
+        assert!(struct_column.is_null(2));
+    }
+
     pub fn source_record_batch() -> RecordBatch {
         RecordBatch::try_new(
             arrow_schema_promotion_addition_and_renaming_required(),

Reply via email to