This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b20a2554a feat(reader): Add Date32 support to RecordBatchTransformer 
create_column (#1792)
b20a2554a is described below

commit b20a2554a1a95e881e92ec4312fd4f75fad6da2f
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Oct 28 13:01:57 2025 -0400

    feat(reader): Add Date32 support to RecordBatchTransformer create_column 
(#1792)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Partially address #1749.
    
    Iceberg Java has a test that does a schema change requires the
    `RecordBatchTransformer` to add a Date32 column, which it currently does
    not support.
    
    ## What changes are included in this PR?
    
    <!--
    Provide a summary of the modifications in this PR. List the main changes
    such as new features, bug fixes, refactoring, or any other updates.
    -->
    
    Add match arms for `Date32` type in `create_column`.
    
    ## Are these changes tested?
    
    <!--
    Specify what test covers (unit test, integration test, etc.).
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    New test that mirrors Iceberg Java's
    `TestSelect.readAndWriteWithBranchAfterSchemaChange`
    
    ---------
    
    Co-authored-by: Xuanwo <[email protected]>
---
 .../iceberg/src/arrow/record_batch_transformer.rs  | 82 +++++++++++++++++++++-
 1 file changed, 79 insertions(+), 3 deletions(-)

diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs 
b/crates/iceberg/src/arrow/record_batch_transformer.rs
index 779f1cc62..71fe59dea 100644
--- a/crates/iceberg/src/arrow/record_batch_transformer.rs
+++ b/crates/iceberg/src/arrow/record_batch_transformer.rs
@@ -19,8 +19,8 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use arrow_array::{
-    Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, 
Float64Array,
-    Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, 
StringArray,
+    Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, 
Float32Array,
+    Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, 
RecordBatchOptions, StringArray,
 };
 use arrow_cast::cast;
 use arrow_schema::{
@@ -401,6 +401,13 @@ impl RecordBatchTransformer {
                 let vals: Vec<Option<i32>> = vec![None; num_rows];
                 Arc::new(Int32Array::from(vals))
             }
+            (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
+                Arc::new(Date32Array::from(vec![*value; num_rows]))
+            }
+            (DataType::Date32, None) => {
+                let vals: Vec<Option<i32>> = vec![None; num_rows];
+                Arc::new(Date32Array::from(vals))
+            }
             (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
                 Arc::new(Int64Array::from(vec![*value; num_rows]))
             }
@@ -453,7 +460,8 @@ mod test {
     use std::sync::Arc;
 
     use arrow_array::{
-        Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, 
StringArray,
+        Array, Date32Array, Float32Array, Float64Array, Int32Array, 
Int64Array, RecordBatch,
+        StringArray,
     };
     use arrow_schema::{DataType, Field, Schema as ArrowSchema};
     use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
@@ -509,6 +517,74 @@ mod test {
         assert_eq!(result, expected);
     }
 
+    #[test]
+    fn schema_evolution_adds_date_column_with_nulls() {
+        // Reproduces TestSelect.readAndWriteWithBranchAfterSchemaChange from 
iceberg-spark.
+        // When reading old snapshots after adding a DATE column, the 
transformer must
+        // populate the new column with NULL values since old files lack this 
field.
+        let snapshot_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::optional(3, "date_col", 
Type::Primitive(PrimitiveType::Date))
+                        .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+        let projected_iceberg_field_ids = [1, 2, 3];
+
+        let mut transformer =
+            RecordBatchTransformer::build(snapshot_schema, 
&projected_iceberg_field_ids);
+
+        let file_schema = Arc::new(ArrowSchema::new(vec![
+            simple_field("id", DataType::Int32, false, "1"),
+            simple_field("name", DataType::Utf8, true, "2"),
+        ]));
+
+        let file_batch = RecordBatch::try_new(file_schema, vec![
+            Arc::new(Int32Array::from(vec![1, 2, 3])),
+            Arc::new(StringArray::from(vec![
+                Some("Alice"),
+                Some("Bob"),
+                Some("Charlie"),
+            ])),
+        ])
+        .unwrap();
+
+        let result = transformer.process_record_batch(file_batch).unwrap();
+
+        assert_eq!(result.num_columns(), 3);
+        assert_eq!(result.num_rows(), 3);
+
+        let id_column = result
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_column.values(), &[1, 2, 3]);
+
+        let name_column = result
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(name_column.value(0), "Alice");
+        assert_eq!(name_column.value(1), "Bob");
+        assert_eq!(name_column.value(2), "Charlie");
+
+        let date_column = result
+            .column(2)
+            .as_any()
+            .downcast_ref::<Date32Array>()
+            .unwrap();
+        assert!(date_column.is_null(0));
+        assert!(date_column.is_null(1));
+        assert!(date_column.is_null(2));
+    }
+
     pub fn source_record_batch() -> RecordBatch {
         RecordBatch::try_new(
             arrow_schema_promotion_addition_and_renaming_required(),

Reply via email to