This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b3b5afed feat(datafusion): implement the project node to add the 
partition columns (#1602)
b3b5afed is described below

commit b3b5afed126ee5962dc3a4ca23cd735694449df8
Author: Florian Valeye <[email protected]>
AuthorDate: Tue Oct 21 11:53:58 2025 +0200

    feat(datafusion): implement the project node to add the partition columns 
(#1602)
    
    ## Which issue does this PR close?
    - Closes #1542
    
    ## What changes are included in this PR?
    Implement a physical execution plan node that projects Iceberg partition
    columns from source data, supporting nested fields and all Iceberg
    transforms.
    
    ## Are these changes tested?
    Yes, with unit tests
    
    ---------
    
    Signed-off-by: Florian Valeye <[email protected]>
---
 crates/iceberg/src/arrow/mod.rs                    |   3 +-
 crates/iceberg/src/arrow/record_batch_projector.rs |  71 ++-
 crates/iceberg/src/transform/mod.rs                |   4 +-
 .../datafusion/src/physical_plan/mod.rs            |   2 +
 .../datafusion/src/physical_plan/project.rs        | 488 +++++++++++++++++++++
 5 files changed, 563 insertions(+), 5 deletions(-)

diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs
index d32cbeb8..28116a4b 100644
--- a/crates/iceberg/src/arrow/mod.rs
+++ b/crates/iceberg/src/arrow/mod.rs
@@ -28,7 +28,8 @@ pub mod delete_file_loader;
 pub(crate) mod delete_filter;
 
 mod reader;
-pub(crate) mod record_batch_projector;
+/// RecordBatch projection utilities
+pub mod record_batch_projector;
 pub(crate) mod record_batch_transformer;
 mod value;
 
diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs 
b/crates/iceberg/src/arrow/record_batch_projector.rs
index 7ca28c25..45de0212 100644
--- a/crates/iceberg/src/arrow/record_batch_projector.rs
+++ b/crates/iceberg/src/arrow/record_batch_projector.rs
@@ -20,13 +20,16 @@ use std::sync::Arc;
 use arrow_array::{ArrayRef, RecordBatch, StructArray, make_array};
 use arrow_buffer::NullBuffer;
 use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
 
+use crate::arrow::schema::schema_to_arrow_schema;
 use crate::error::Result;
+use crate::spec::Schema as IcebergSchema;
 use crate::{Error, ErrorKind};
 
 /// Help to project specific field from `RecordBatch`` according to the fields 
id.
-#[derive(Clone, Debug)]
-pub(crate) struct RecordBatchProjector {
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct RecordBatchProjector {
     // A vector of vectors, where each inner vector represents the index path 
to access a specific field in a nested structure.
     // E.g. [[0], [1, 2]] means the first field is accessed directly from the 
first column,
     // while the second field is accessed from the second column and then from 
its third subcolumn (second column must be a struct column).
@@ -77,6 +80,46 @@ impl RecordBatchProjector {
         })
     }
 
+    /// Create RecordBatchProjector using Iceberg schema.
+    ///
+    /// This constructor converts the Iceberg schema to Arrow schema with 
field ID metadata,
+    /// then uses the standard field ID lookup for projection.
+    ///
+    /// # Arguments
+    /// * `iceberg_schema` - The Iceberg schema for field ID mapping  
+    /// * `target_field_ids` - The field IDs to project
+    pub fn from_iceberg_schema(
+        iceberg_schema: Arc<IcebergSchema>,
+        target_field_ids: &[i32],
+    ) -> Result<Self> {
+        let arrow_schema_with_ids = 
Arc::new(schema_to_arrow_schema(&iceberg_schema)?);
+
+        let field_id_fetch_func = |field: &Field| -> Result<Option<i64>> {
+            if let Some(value) = 
field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
+                let field_id = value.parse::<i32>().map_err(|e| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Failed to parse field id".to_string(),
+                    )
+                    .with_context("value", value)
+                    .with_source(e)
+                })?;
+                Ok(Some(field_id as i64))
+            } else {
+                Ok(None)
+            }
+        };
+
+        let searchable_field_func = |_field: &Field| -> bool { true };
+
+        Self::new(
+            arrow_schema_with_ids,
+            target_field_ids,
+            field_id_fetch_func,
+            searchable_field_func,
+        )
+    }
+
     fn fetch_field_index<F1, F2>(
         fields: &Fields,
         index_vec: &mut Vec<usize>,
@@ -129,7 +172,7 @@ impl RecordBatchProjector {
     }
 
     /// Do projection with columns
-    pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> 
Result<Vec<ArrayRef>> {
+    pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
         self.field_indices
             .iter()
             .map(|index_vec| Self::get_column_by_field_index(batch, index_vec))
@@ -166,6 +209,7 @@ mod test {
     use arrow_schema::{DataType, Field, Fields, Schema};
 
     use crate::arrow::record_batch_projector::RecordBatchProjector;
+    use crate::spec::{NestedField, PrimitiveType, Schema as IcebergSchema, 
Type};
     use crate::{Error, ErrorKind};
 
     #[test]
@@ -293,4 +337,25 @@ mod test {
             RecordBatchProjector::new(schema.clone(), &[3], 
field_id_fetch_func, |_| true);
         assert!(projector.is_ok());
     }
+
+    #[test]
+    fn test_from_iceberg_schema() {
+        let iceberg_schema = IcebergSchema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                NestedField::optional(3, "age", 
Type::Primitive(PrimitiveType::Int)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        let projector =
+            
RecordBatchProjector::from_iceberg_schema(Arc::new(iceberg_schema), &[1, 
3]).unwrap();
+
+        assert_eq!(projector.field_indices.len(), 2);
+        assert_eq!(projector.projected_schema_ref().fields().len(), 2);
+        assert_eq!(projector.projected_schema_ref().field(0).name(), "id");
+        assert_eq!(projector.projected_schema_ref().field(1).name(), "age");
+    }
 }
diff --git a/crates/iceberg/src/transform/mod.rs 
b/crates/iceberg/src/transform/mod.rs
index 4cc0d1fe..809d2daf 100644
--- a/crates/iceberg/src/transform/mod.rs
+++ b/crates/iceberg/src/transform/mod.rs
@@ -17,6 +17,8 @@
 
 //! Transform function used to compute partition values.
 
+use std::fmt::Debug;
+
 use arrow_array::ArrayRef;
 
 use crate::spec::{Datum, Transform};
@@ -29,7 +31,7 @@ mod truncate;
 mod void;
 
 /// TransformFunction is a trait that defines the interface for all transform 
functions.
-pub trait TransformFunction: Send + Sync {
+pub trait TransformFunction: Send + Sync + Debug {
     /// transform will take an input array and transform it into a new array.
     /// The implementation of this function will need to check and downcast 
the input to specific
     /// type.
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs 
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index fcfd11a4..ce923b86 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -18,9 +18,11 @@
 pub(crate) mod commit;
 pub(crate) mod expr_to_predicate;
 pub(crate) mod metadata_scan;
+pub(crate) mod project;
 pub(crate) mod scan;
 pub(crate) mod write;
 
 pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";
 
+pub use project::project_with_partition;
 pub use scan::IcebergTableScan;
diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs 
b/crates/integrations/datafusion/src/physical_plan/project.rs
new file mode 100644
index 00000000..4bfe8192
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/project.rs
@@ -0,0 +1,488 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partition value projection for Iceberg tables.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{ArrayRef, RecordBatch, StructArray};
+use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
+use datafusion::common::Result as DFResult;
+use datafusion::error::DataFusionError;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::expressions::Column;
+use datafusion::physical_plan::projection::ProjectionExec;
+use datafusion::physical_plan::{ColumnarValue, ExecutionPlan};
+use iceberg::arrow::record_batch_projector::RecordBatchProjector;
+use iceberg::spec::{PartitionSpec, Schema};
+use iceberg::table::Table;
+use iceberg::transform::BoxedTransformFunction;
+
+use crate::to_datafusion_error;
+
+/// Column name for the combined partition values struct
+const PARTITION_VALUES_COLUMN: &str = "_partition";
+
+/// Extends an ExecutionPlan with partition value calculations for Iceberg 
tables.
+///
+/// This function takes an input ExecutionPlan and extends it with an 
additional column
+/// containing calculated partition values based on the table's partition 
specification.
+/// For unpartitioned tables, returns the original plan unchanged.
+///
+/// # Arguments
+/// * `input` - The input ExecutionPlan to extend
+/// * `table` - The Iceberg table with partition specification
+///
+/// # Returns
+/// * `Ok(Arc<dyn ExecutionPlan>)` - Extended plan with partition values column
+/// * `Err` - If partition spec is not found or transformation fails
+pub fn project_with_partition(
+    input: Arc<dyn ExecutionPlan>,
+    table: &Table,
+) -> DFResult<Arc<dyn ExecutionPlan>> {
+    let metadata = table.metadata();
+    let partition_spec = metadata.default_partition_spec();
+    let table_schema = metadata.current_schema();
+
+    if partition_spec.is_unpartitioned() {
+        return Ok(input);
+    }
+
+    let input_schema = input.schema();
+    // TODO: Validate that input_schema matches the Iceberg table schema.
+    // See: https://github.com/apache/iceberg-rust/issues/1752
+    let partition_type = build_partition_type(partition_spec, 
table_schema.as_ref())?;
+    let calculator = PartitionValueCalculator::new(
+        partition_spec.as_ref().clone(),
+        table_schema.as_ref().clone(),
+        partition_type,
+    )?;
+
+    let mut projection_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+        Vec::with_capacity(input_schema.fields().len() + 1);
+
+    for (index, field) in input_schema.fields().iter().enumerate() {
+        let column_expr = Arc::new(Column::new(field.name(), index));
+        projection_exprs.push((column_expr, field.name().clone()));
+    }
+
+    let partition_expr = Arc::new(PartitionExpr::new(calculator));
+    projection_exprs.push((partition_expr, 
PARTITION_VALUES_COLUMN.to_string()));
+
+    let projection = ProjectionExec::try_new(projection_exprs, input)?;
+    Ok(Arc::new(projection))
+}
+
+/// PhysicalExpr implementation for partition value calculation
+#[derive(Debug, Clone)]
+struct PartitionExpr {
+    calculator: Arc<PartitionValueCalculator>,
+}
+
+impl PartitionExpr {
+    fn new(calculator: PartitionValueCalculator) -> Self {
+        Self {
+            calculator: Arc::new(calculator),
+        }
+    }
+}
+
+// Manual PartialEq/Eq implementations for pointer-based equality
+// (two PartitionExpr are equal if they share the same calculator instance)
+impl PartialEq for PartitionExpr {
+    fn eq(&self, other: &Self) -> bool {
+        Arc::ptr_eq(&self.calculator, &other.calculator)
+    }
+}
+
+impl Eq for PartitionExpr {}
+
+impl PhysicalExpr for PartitionExpr {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &ArrowSchema) -> DFResult<DataType> {
+        Ok(self.calculator.partition_type.clone())
+    }
+
+    fn nullable(&self, _input_schema: &ArrowSchema) -> DFResult<bool> {
+        Ok(false)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
+        let array = self.calculator.calculate(batch)?;
+        Ok(ColumnarValue::Array(array))
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> DFResult<Arc<dyn PhysicalExpr>> {
+        Ok(self)
+    }
+
+    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let field_names: Vec<String> = self
+            .calculator
+            .partition_spec
+            .fields()
+            .iter()
+            .map(|pf| format!("{}({})", pf.transform, pf.name))
+            .collect();
+        write!(f, "iceberg_partition_values[{}]", field_names.join(", "))
+    }
+}
+
+impl std::fmt::Display for PartitionExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let field_names: Vec<&str> = self
+            .calculator
+            .partition_spec
+            .fields()
+            .iter()
+            .map(|pf| pf.name.as_str())
+            .collect();
+        write!(f, "iceberg_partition_values({})", field_names.join(", "))
+    }
+}
+
+impl std::hash::Hash for PartitionExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        // Two PartitionExpr are equal if they share the same calculator Arc
+        Arc::as_ptr(&self.calculator).hash(state);
+    }
+}
+
+/// Calculator for partition values in Iceberg tables
+#[derive(Debug)]
+struct PartitionValueCalculator {
+    partition_spec: PartitionSpec,
+    partition_type: DataType,
+    projector: RecordBatchProjector,
+    transform_functions: Vec<BoxedTransformFunction>,
+}
+
+impl PartitionValueCalculator {
+    fn new(
+        partition_spec: PartitionSpec,
+        table_schema: Schema,
+        partition_type: DataType,
+    ) -> DFResult<Self> {
+        if partition_spec.is_unpartitioned() {
+            return Err(DataFusionError::Internal(
+                "Cannot create partition calculator for unpartitioned 
table".to_string(),
+            ));
+        }
+
+        let transform_functions: Result<Vec<BoxedTransformFunction>, _> = 
partition_spec
+            .fields()
+            .iter()
+            .map(|pf| 
iceberg::transform::create_transform_function(&pf.transform))
+            .collect();
+
+        let transform_functions = 
transform_functions.map_err(to_datafusion_error)?;
+
+        let source_field_ids: Vec<i32> = partition_spec
+            .fields()
+            .iter()
+            .map(|pf| pf.source_id)
+            .collect();
+
+        let projector = RecordBatchProjector::from_iceberg_schema(
+            Arc::new(table_schema.clone()),
+            &source_field_ids,
+        )
+        .map_err(to_datafusion_error)?;
+
+        Ok(Self {
+            partition_spec,
+            partition_type,
+            projector,
+            transform_functions,
+        })
+    }
+
+    fn calculate(&self, batch: &RecordBatch) -> DFResult<ArrayRef> {
+        let source_columns = self
+            .projector
+            .project_column(batch.columns())
+            .map_err(to_datafusion_error)?;
+
+        let expected_struct_fields = match &self.partition_type {
+            DataType::Struct(fields) => fields.clone(),
+            _ => {
+                return Err(DataFusionError::Internal(
+                    "Expected partition type must be a struct".to_string(),
+                ));
+            }
+        };
+
+        let mut partition_values = 
Vec::with_capacity(self.partition_spec.fields().len());
+
+        for (source_column, transform_fn) in 
source_columns.iter().zip(&self.transform_functions) {
+            let partition_value = transform_fn
+                .transform(source_column.clone())
+                .map_err(to_datafusion_error)?;
+
+            partition_values.push(partition_value);
+        }
+
+        let struct_array = StructArray::try_new(expected_struct_fields, 
partition_values, None)
+            .map_err(|e| DataFusionError::ArrowError(e, None))?;
+
+        Ok(Arc::new(struct_array))
+    }
+}
+
+fn build_partition_type(
+    partition_spec: &PartitionSpec,
+    table_schema: &Schema,
+) -> DFResult<DataType> {
+    let partition_struct_type = partition_spec
+        .partition_type(table_schema)
+        .map_err(to_datafusion_error)?;
+
+    
iceberg::arrow::type_to_arrow_type(&iceberg::spec::Type::Struct(partition_struct_type))
+        .map_err(to_datafusion_error)
+}
+
+#[cfg(test)]
+mod tests {
+    use datafusion::arrow::array::Int32Array;
+    use datafusion::arrow::datatypes::{Field, Fields};
+    use datafusion::physical_plan::empty::EmptyExec;
+    use iceberg::spec::{NestedField, PrimitiveType, StructType, Transform, 
Type};
+
+    use super::*;
+
+    #[test]
+    fn test_partition_calculator_basic() {
+        let table_schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = 
iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone()))
+            .add_partition_field("id", "id_partition", Transform::Identity)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let _arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+        ]));
+
+        let partition_type = build_partition_type(&partition_spec, 
&table_schema).unwrap();
+        let calculator = PartitionValueCalculator::new(
+            partition_spec.clone(),
+            table_schema,
+            partition_type.clone(),
+        )
+        .unwrap();
+
+        assert_eq!(calculator.partition_type, partition_type);
+    }
+
+    #[test]
+    fn test_partition_expr_with_projection() {
+        let table_schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = 
iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone()))
+            .add_partition_field("id", "id_partition", Transform::Identity)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+        ]));
+
+        let input = Arc::new(EmptyExec::new(arrow_schema.clone()));
+
+        let partition_type = build_partition_type(&partition_spec, 
&table_schema).unwrap();
+        let calculator =
+            PartitionValueCalculator::new(partition_spec, table_schema, 
partition_type).unwrap();
+
+        let mut projection_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+            Vec::with_capacity(arrow_schema.fields().len() + 1);
+        for (i, field) in arrow_schema.fields().iter().enumerate() {
+            let column_expr = Arc::new(Column::new(field.name(), i));
+            projection_exprs.push((column_expr, field.name().clone()));
+        }
+
+        let partition_expr = Arc::new(PartitionExpr::new(calculator));
+        projection_exprs.push((partition_expr, 
PARTITION_VALUES_COLUMN.to_string()));
+
+        let projection = ProjectionExec::try_new(projection_exprs, 
input).unwrap();
+        let result = Arc::new(projection);
+
+        assert_eq!(result.schema().fields().len(), 3);
+        assert_eq!(result.schema().field(0).name(), "id");
+        assert_eq!(result.schema().field(1).name(), "name");
+        assert_eq!(result.schema().field(2).name(), "_partition");
+    }
+
+    #[test]
+    fn test_partition_expr_evaluate() {
+        let table_schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "data", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = 
iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone()))
+            .add_partition_field("id", "id_partition", Transform::Identity)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("data", DataType::Utf8, false),
+        ]));
+
+        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
+            Arc::new(Int32Array::from(vec![10, 20, 30])),
+            Arc::new(datafusion::arrow::array::StringArray::from(vec![
+                "a", "b", "c",
+            ])),
+        ])
+        .unwrap();
+
+        let partition_type = build_partition_type(&partition_spec, 
&table_schema).unwrap();
+        let calculator =
+            PartitionValueCalculator::new(partition_spec, table_schema, 
partition_type.clone())
+                .unwrap();
+        let expr = PartitionExpr::new(calculator);
+
+        assert_eq!(expr.data_type(&arrow_schema).unwrap(), partition_type);
+        assert!(!expr.nullable(&arrow_schema).unwrap());
+
+        let result = expr.evaluate(&batch).unwrap();
+        match result {
+            ColumnarValue::Array(array) => {
+                let struct_array = 
array.as_any().downcast_ref::<StructArray>().unwrap();
+                let id_partition = struct_array
+                    .column_by_name("id_partition")
+                    .unwrap()
+                    .as_any()
+                    .downcast_ref::<Int32Array>()
+                    .unwrap();
+                assert_eq!(id_partition.value(0), 10);
+                assert_eq!(id_partition.value(1), 20);
+                assert_eq!(id_partition.value(2), 30);
+            }
+            _ => panic!("Expected array result"),
+        }
+    }
+
+    #[test]
+    fn test_nested_partition() {
+        let address_struct = StructType::new(vec![
+            NestedField::required(3, "street", 
Type::Primitive(PrimitiveType::String)).into(),
+            NestedField::required(4, "city", 
Type::Primitive(PrimitiveType::String)).into(),
+        ]);
+
+        let table_schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "address", 
Type::Struct(address_struct)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = 
iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone()))
+            .add_partition_field("address.city", "city_partition", 
Transform::Identity)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let struct_fields = Fields::from(vec![
+            Field::new("street", DataType::Utf8, false),
+            Field::new("city", DataType::Utf8, false),
+        ]);
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("address", DataType::Struct(struct_fields), false),
+        ]));
+
+        let street_array = 
Arc::new(datafusion::arrow::array::StringArray::from(vec![
+            "123 Main St",
+            "456 Oak Ave",
+        ]));
+        let city_array = 
Arc::new(datafusion::arrow::array::StringArray::from(vec![
+            "New York",
+            "Los Angeles",
+        ]));
+
+        let struct_array = StructArray::from(vec![
+            (
+                Arc::new(Field::new("street", DataType::Utf8, false)),
+                street_array as ArrayRef,
+            ),
+            (
+                Arc::new(Field::new("city", DataType::Utf8, false)),
+                city_array as ArrayRef,
+            ),
+        ]);
+
+        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
+            Arc::new(Int32Array::from(vec![1, 2])),
+            Arc::new(struct_array),
+        ])
+        .unwrap();
+
+        let partition_type = build_partition_type(&partition_spec, 
&table_schema).unwrap();
+        let calculator =
+            PartitionValueCalculator::new(partition_spec, table_schema, 
partition_type).unwrap();
+        let array = calculator.calculate(&batch).unwrap();
+
+        let struct_array = 
array.as_any().downcast_ref::<StructArray>().unwrap();
+        let city_partition = struct_array
+            .column_by_name("city_partition")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<datafusion::arrow::array::StringArray>()
+            .unwrap();
+
+        assert_eq!(city_partition.value(0), "New York");
+        assert_eq!(city_partition.value(1), "Los Angeles");
+    }
+}

Reply via email to