This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b3b5afed feat(datafusion): implement the project node to add the
partition columns (#1602)
b3b5afed is described below
commit b3b5afed126ee5962dc3a4ca23cd735694449df8
Author: Florian Valeye <[email protected]>
AuthorDate: Tue Oct 21 11:53:58 2025 +0200
feat(datafusion): implement the project node to add the partition columns
(#1602)
## Which issue does this PR close?
- Closes #1542
## What changes are included in this PR?
Implement a physical execution plan node that projects Iceberg partition
columns from source data, supporting nested fields and all Iceberg
transforms.
## Are these changes tested?
Yes, with unit tests
---------
Signed-off-by: Florian Valeye <[email protected]>
---
crates/iceberg/src/arrow/mod.rs | 3 +-
crates/iceberg/src/arrow/record_batch_projector.rs | 71 ++-
crates/iceberg/src/transform/mod.rs | 4 +-
.../datafusion/src/physical_plan/mod.rs | 2 +
.../datafusion/src/physical_plan/project.rs | 488 +++++++++++++++++++++
5 files changed, 563 insertions(+), 5 deletions(-)
diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs
index d32cbeb8..28116a4b 100644
--- a/crates/iceberg/src/arrow/mod.rs
+++ b/crates/iceberg/src/arrow/mod.rs
@@ -28,7 +28,8 @@ pub mod delete_file_loader;
pub(crate) mod delete_filter;
mod reader;
-pub(crate) mod record_batch_projector;
+/// RecordBatch projection utilities
+pub mod record_batch_projector;
pub(crate) mod record_batch_transformer;
mod value;
diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs
b/crates/iceberg/src/arrow/record_batch_projector.rs
index 7ca28c25..45de0212 100644
--- a/crates/iceberg/src/arrow/record_batch_projector.rs
+++ b/crates/iceberg/src/arrow/record_batch_projector.rs
@@ -20,13 +20,16 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch, StructArray, make_array};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+use crate::arrow::schema::schema_to_arrow_schema;
use crate::error::Result;
+use crate::spec::Schema as IcebergSchema;
use crate::{Error, ErrorKind};
/// Help to project specific field from `RecordBatch`` according to the fields
id.
-#[derive(Clone, Debug)]
-pub(crate) struct RecordBatchProjector {
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct RecordBatchProjector {
// A vector of vectors, where each inner vector represents the index path
to access a specific field in a nested structure.
// E.g. [[0], [1, 2]] means the first field is accessed directly from the
first column,
// while the second field is accessed from the second column and then from
its third subcolumn (second column must be a struct column).
@@ -77,6 +80,46 @@ impl RecordBatchProjector {
})
}
+ /// Create RecordBatchProjector using Iceberg schema.
+ ///
+ /// This constructor converts the Iceberg schema to Arrow schema with
field ID metadata,
+ /// then uses the standard field ID lookup for projection.
+ ///
+ /// # Arguments
+ /// * `iceberg_schema` - The Iceberg schema for field ID mapping
+ /// * `target_field_ids` - The field IDs to project
+ pub fn from_iceberg_schema(
+ iceberg_schema: Arc<IcebergSchema>,
+ target_field_ids: &[i32],
+ ) -> Result<Self> {
+ let arrow_schema_with_ids =
Arc::new(schema_to_arrow_schema(&iceberg_schema)?);
+
+ let field_id_fetch_func = |field: &Field| -> Result<Option<i64>> {
+ if let Some(value) =
field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
+ let field_id = value.parse::<i32>().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Failed to parse field id".to_string(),
+ )
+ .with_context("value", value)
+ .with_source(e)
+ })?;
+ Ok(Some(field_id as i64))
+ } else {
+ Ok(None)
+ }
+ };
+
+ let searchable_field_func = |_field: &Field| -> bool { true };
+
+ Self::new(
+ arrow_schema_with_ids,
+ target_field_ids,
+ field_id_fetch_func,
+ searchable_field_func,
+ )
+ }
+
fn fetch_field_index<F1, F2>(
fields: &Fields,
index_vec: &mut Vec<usize>,
@@ -129,7 +172,7 @@ impl RecordBatchProjector {
}
/// Do projection with columns
- pub(crate) fn project_column(&self, batch: &[ArrayRef]) ->
Result<Vec<ArrayRef>> {
+ pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
self.field_indices
.iter()
.map(|index_vec| Self::get_column_by_field_index(batch, index_vec))
@@ -166,6 +209,7 @@ mod test {
use arrow_schema::{DataType, Field, Fields, Schema};
use crate::arrow::record_batch_projector::RecordBatchProjector;
+ use crate::spec::{NestedField, PrimitiveType, Schema as IcebergSchema,
Type};
use crate::{Error, ErrorKind};
#[test]
@@ -293,4 +337,25 @@ mod test {
RecordBatchProjector::new(schema.clone(), &[3],
field_id_fetch_func, |_| true);
assert!(projector.is_ok());
}
+
+ #[test]
+ fn test_from_iceberg_schema() {
+ let iceberg_schema = IcebergSchema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(3, "age",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let projector =
+
RecordBatchProjector::from_iceberg_schema(Arc::new(iceberg_schema), &[1,
3]).unwrap();
+
+ assert_eq!(projector.field_indices.len(), 2);
+ assert_eq!(projector.projected_schema_ref().fields().len(), 2);
+ assert_eq!(projector.projected_schema_ref().field(0).name(), "id");
+ assert_eq!(projector.projected_schema_ref().field(1).name(), "age");
+ }
}
diff --git a/crates/iceberg/src/transform/mod.rs
b/crates/iceberg/src/transform/mod.rs
index 4cc0d1fe..809d2daf 100644
--- a/crates/iceberg/src/transform/mod.rs
+++ b/crates/iceberg/src/transform/mod.rs
@@ -17,6 +17,8 @@
//! Transform function used to compute partition values.
+use std::fmt::Debug;
+
use arrow_array::ArrayRef;
use crate::spec::{Datum, Transform};
@@ -29,7 +31,7 @@ mod truncate;
mod void;
/// TransformFunction is a trait that defines the interface for all transform
functions.
-pub trait TransformFunction: Send + Sync {
+pub trait TransformFunction: Send + Sync + Debug {
/// transform will take an input array and transform it into a new array.
/// The implementation of this function will need to check and downcast
the input to specific
/// type.
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index fcfd11a4..ce923b86 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -18,9 +18,11 @@
pub(crate) mod commit;
pub(crate) mod expr_to_predicate;
pub(crate) mod metadata_scan;
+pub(crate) mod project;
pub(crate) mod scan;
pub(crate) mod write;
pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";
+pub use project::project_with_partition;
pub use scan::IcebergTableScan;
diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs
b/crates/integrations/datafusion/src/physical_plan/project.rs
new file mode 100644
index 00000000..4bfe8192
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/project.rs
@@ -0,0 +1,488 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partition value projection for Iceberg tables.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{ArrayRef, RecordBatch, StructArray};
+use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
+use datafusion::common::Result as DFResult;
+use datafusion::error::DataFusionError;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::expressions::Column;
+use datafusion::physical_plan::projection::ProjectionExec;
+use datafusion::physical_plan::{ColumnarValue, ExecutionPlan};
+use iceberg::arrow::record_batch_projector::RecordBatchProjector;
+use iceberg::spec::{PartitionSpec, Schema};
+use iceberg::table::Table;
+use iceberg::transform::BoxedTransformFunction;
+
+use crate::to_datafusion_error;
+
+/// Column name for the combined partition values struct
+const PARTITION_VALUES_COLUMN: &str = "_partition";
+
+/// Extends an ExecutionPlan with partition value calculations for Iceberg
tables.
+///
+/// This function takes an input ExecutionPlan and extends it with an
additional column
+/// containing calculated partition values based on the table's partition
specification.
+/// For unpartitioned tables, returns the original plan unchanged.
+///
+/// # Arguments
+/// * `input` - The input ExecutionPlan to extend
+/// * `table` - The Iceberg table with partition specification
+///
+/// # Returns
+/// * `Ok(Arc<dyn ExecutionPlan>)` - Extended plan with partition values column
+/// * `Err` - If partition spec is not found or transformation fails
+pub fn project_with_partition(
+ input: Arc<dyn ExecutionPlan>,
+ table: &Table,
+) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let metadata = table.metadata();
+ let partition_spec = metadata.default_partition_spec();
+ let table_schema = metadata.current_schema();
+
+ if partition_spec.is_unpartitioned() {
+ return Ok(input);
+ }
+
+ let input_schema = input.schema();
+ // TODO: Validate that input_schema matches the Iceberg table schema.
+ // See: https://github.com/apache/iceberg-rust/issues/1752
+ let partition_type = build_partition_type(partition_spec,
table_schema.as_ref())?;
+ let calculator = PartitionValueCalculator::new(
+ partition_spec.as_ref().clone(),
+ table_schema.as_ref().clone(),
+ partition_type,
+ )?;
+
+ let mut projection_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ Vec::with_capacity(input_schema.fields().len() + 1);
+
+ for (index, field) in input_schema.fields().iter().enumerate() {
+ let column_expr = Arc::new(Column::new(field.name(), index));
+ projection_exprs.push((column_expr, field.name().clone()));
+ }
+
+ let partition_expr = Arc::new(PartitionExpr::new(calculator));
+ projection_exprs.push((partition_expr,
PARTITION_VALUES_COLUMN.to_string()));
+
+ let projection = ProjectionExec::try_new(projection_exprs, input)?;
+ Ok(Arc::new(projection))
+}
+
+/// PhysicalExpr implementation for partition value calculation
+#[derive(Debug, Clone)]
+struct PartitionExpr {
+ calculator: Arc<PartitionValueCalculator>,
+}
+
+impl PartitionExpr {
+ fn new(calculator: PartitionValueCalculator) -> Self {
+ Self {
+ calculator: Arc::new(calculator),
+ }
+ }
+}
+
+// Manual PartialEq/Eq implementations for pointer-based equality
+// (two PartitionExpr are equal if they share the same calculator instance)
+impl PartialEq for PartitionExpr {
+ fn eq(&self, other: &Self) -> bool {
+ Arc::ptr_eq(&self.calculator, &other.calculator)
+ }
+}
+
+impl Eq for PartitionExpr {}
+
+impl PhysicalExpr for PartitionExpr {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn data_type(&self, _input_schema: &ArrowSchema) -> DFResult<DataType> {
+ Ok(self.calculator.partition_type.clone())
+ }
+
+ fn nullable(&self, _input_schema: &ArrowSchema) -> DFResult<bool> {
+ Ok(false)
+ }
+
+ fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
+ let array = self.calculator.calculate(batch)?;
+ Ok(ColumnarValue::Array(array))
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn PhysicalExpr>>,
+ ) -> DFResult<Arc<dyn PhysicalExpr>> {
+ Ok(self)
+ }
+
+ fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let field_names: Vec<String> = self
+ .calculator
+ .partition_spec
+ .fields()
+ .iter()
+ .map(|pf| format!("{}({})", pf.transform, pf.name))
+ .collect();
+ write!(f, "iceberg_partition_values[{}]", field_names.join(", "))
+ }
+}
+
+impl std::fmt::Display for PartitionExpr {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let field_names: Vec<&str> = self
+ .calculator
+ .partition_spec
+ .fields()
+ .iter()
+ .map(|pf| pf.name.as_str())
+ .collect();
+ write!(f, "iceberg_partition_values({})", field_names.join(", "))
+ }
+}
+
+impl std::hash::Hash for PartitionExpr {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ // Two PartitionExpr are equal if they share the same calculator Arc
+ Arc::as_ptr(&self.calculator).hash(state);
+ }
+}
+
+/// Calculator for partition values in Iceberg tables
+#[derive(Debug)]
+struct PartitionValueCalculator {
+ partition_spec: PartitionSpec,
+ partition_type: DataType,
+ projector: RecordBatchProjector,
+ transform_functions: Vec<BoxedTransformFunction>,
+}
+
+impl PartitionValueCalculator {
+ fn new(
+ partition_spec: PartitionSpec,
+ table_schema: Schema,
+ partition_type: DataType,
+ ) -> DFResult<Self> {
+ if partition_spec.is_unpartitioned() {
+ return Err(DataFusionError::Internal(
+ "Cannot create partition calculator for unpartitioned
table".to_string(),
+ ));
+ }
+
+ let transform_functions: Result<Vec<BoxedTransformFunction>, _> =
partition_spec
+ .fields()
+ .iter()
+ .map(|pf|
iceberg::transform::create_transform_function(&pf.transform))
+ .collect();
+
+ let transform_functions =
transform_functions.map_err(to_datafusion_error)?;
+
+ let source_field_ids: Vec<i32> = partition_spec
+ .fields()
+ .iter()
+ .map(|pf| pf.source_id)
+ .collect();
+
+ let projector = RecordBatchProjector::from_iceberg_schema(
+ Arc::new(table_schema.clone()),
+ &source_field_ids,
+ )
+ .map_err(to_datafusion_error)?;
+
+ Ok(Self {
+ partition_spec,
+ partition_type,
+ projector,
+ transform_functions,
+ })
+ }
+
+ fn calculate(&self, batch: &RecordBatch) -> DFResult<ArrayRef> {
+ let source_columns = self
+ .projector
+ .project_column(batch.columns())
+ .map_err(to_datafusion_error)?;
+
+ let expected_struct_fields = match &self.partition_type {
+ DataType::Struct(fields) => fields.clone(),
+ _ => {
+ return Err(DataFusionError::Internal(
+ "Expected partition type must be a struct".to_string(),
+ ));
+ }
+ };
+
+ let mut partition_values =
Vec::with_capacity(self.partition_spec.fields().len());
+
+ for (source_column, transform_fn) in
source_columns.iter().zip(&self.transform_functions) {
+ let partition_value = transform_fn
+ .transform(source_column.clone())
+ .map_err(to_datafusion_error)?;
+
+ partition_values.push(partition_value);
+ }
+
+ let struct_array = StructArray::try_new(expected_struct_fields,
partition_values, None)
+ .map_err(|e| DataFusionError::ArrowError(e, None))?;
+
+ Ok(Arc::new(struct_array))
+ }
+}
+
+fn build_partition_type(
+ partition_spec: &PartitionSpec,
+ table_schema: &Schema,
+) -> DFResult<DataType> {
+ let partition_struct_type = partition_spec
+ .partition_type(table_schema)
+ .map_err(to_datafusion_error)?;
+
+
iceberg::arrow::type_to_arrow_type(&iceberg::spec::Type::Struct(partition_struct_type))
+ .map_err(to_datafusion_error)
+}
+
+#[cfg(test)]
+mod tests {
+ use datafusion::arrow::array::Int32Array;
+ use datafusion::arrow::datatypes::{Field, Fields};
+ use datafusion::physical_plan::empty::EmptyExec;
+ use iceberg::spec::{NestedField, PrimitiveType, StructType, Transform,
Type};
+
+ use super::*;
+
+ #[test]
+ fn test_partition_calculator_basic() {
+ let table_schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone()))
+ .add_partition_field("id", "id_partition", Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let _arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ ]));
+
+ let partition_type = build_partition_type(&partition_spec,
&table_schema).unwrap();
+ let calculator = PartitionValueCalculator::new(
+ partition_spec.clone(),
+ table_schema,
+ partition_type.clone(),
+ )
+ .unwrap();
+
+ assert_eq!(calculator.partition_type, partition_type);
+ }
+
+ #[test]
+ fn test_partition_expr_with_projection() {
+ let table_schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone()))
+ .add_partition_field("id", "id_partition", Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ ]));
+
+ let input = Arc::new(EmptyExec::new(arrow_schema.clone()));
+
+ let partition_type = build_partition_type(&partition_spec,
&table_schema).unwrap();
+ let calculator =
+ PartitionValueCalculator::new(partition_spec, table_schema,
partition_type).unwrap();
+
+ let mut projection_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ Vec::with_capacity(arrow_schema.fields().len() + 1);
+ for (i, field) in arrow_schema.fields().iter().enumerate() {
+ let column_expr = Arc::new(Column::new(field.name(), i));
+ projection_exprs.push((column_expr, field.name().clone()));
+ }
+
+ let partition_expr = Arc::new(PartitionExpr::new(calculator));
+ projection_exprs.push((partition_expr,
PARTITION_VALUES_COLUMN.to_string()));
+
+ let projection = ProjectionExec::try_new(projection_exprs,
input).unwrap();
+ let result = Arc::new(projection);
+
+ assert_eq!(result.schema().fields().len(), 3);
+ assert_eq!(result.schema().field(0).name(), "id");
+ assert_eq!(result.schema().field(1).name(), "name");
+ assert_eq!(result.schema().field(2).name(), "_partition");
+ }
+
+ #[test]
+ fn test_partition_expr_evaluate() {
+ let table_schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "data",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone()))
+ .add_partition_field("id", "id_partition", Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("data", DataType::Utf8, false),
+ ]));
+
+ let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
+ Arc::new(Int32Array::from(vec![10, 20, 30])),
+ Arc::new(datafusion::arrow::array::StringArray::from(vec![
+ "a", "b", "c",
+ ])),
+ ])
+ .unwrap();
+
+ let partition_type = build_partition_type(&partition_spec,
&table_schema).unwrap();
+ let calculator =
+ PartitionValueCalculator::new(partition_spec, table_schema,
partition_type.clone())
+ .unwrap();
+ let expr = PartitionExpr::new(calculator);
+
+ assert_eq!(expr.data_type(&arrow_schema).unwrap(), partition_type);
+ assert!(!expr.nullable(&arrow_schema).unwrap());
+
+ let result = expr.evaluate(&batch).unwrap();
+ match result {
+ ColumnarValue::Array(array) => {
+ let struct_array =
array.as_any().downcast_ref::<StructArray>().unwrap();
+ let id_partition = struct_array
+ .column_by_name("id_partition")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(id_partition.value(0), 10);
+ assert_eq!(id_partition.value(1), 20);
+ assert_eq!(id_partition.value(2), 30);
+ }
+ _ => panic!("Expected array result"),
+ }
+ }
+
+ #[test]
+ fn test_nested_partition() {
+ let address_struct = StructType::new(vec![
+ NestedField::required(3, "street",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(4, "city",
Type::Primitive(PrimitiveType::String)).into(),
+ ]);
+
+ let table_schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "address",
Type::Struct(address_struct)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone()))
+ .add_partition_field("address.city", "city_partition",
Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let struct_fields = Fields::from(vec![
+ Field::new("street", DataType::Utf8, false),
+ Field::new("city", DataType::Utf8, false),
+ ]);
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("address", DataType::Struct(struct_fields), false),
+ ]));
+
+ let street_array =
Arc::new(datafusion::arrow::array::StringArray::from(vec![
+ "123 Main St",
+ "456 Oak Ave",
+ ]));
+ let city_array =
Arc::new(datafusion::arrow::array::StringArray::from(vec![
+ "New York",
+ "Los Angeles",
+ ]));
+
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("street", DataType::Utf8, false)),
+ street_array as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("city", DataType::Utf8, false)),
+ city_array as ArrayRef,
+ ),
+ ]);
+
+ let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
+ Arc::new(Int32Array::from(vec![1, 2])),
+ Arc::new(struct_array),
+ ])
+ .unwrap();
+
+ let partition_type = build_partition_type(&partition_spec,
&table_schema).unwrap();
+ let calculator =
+ PartitionValueCalculator::new(partition_spec, table_schema,
partition_type).unwrap();
+ let array = calculator.calculate(&batch).unwrap();
+
+ let struct_array =
array.as_any().downcast_ref::<StructArray>().unwrap();
+ let city_partition = struct_array
+ .column_by_name("city_partition")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<datafusion::arrow::array::StringArray>()
+ .unwrap();
+
+ assert_eq!(city_partition.value(0), "New York");
+ assert_eq!(city_partition.value(1), "Los Angeles");
+ }
+}