This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c1a9e68 RecordBatchTransformer: Handle schema migration and column 
re-ordering in table scans (#602)
5c1a9e68 is described below

commit 5c1a9e68da346819072a15327080a498ad91c488
Author: Scott Donnelly <[email protected]>
AuthorDate: Fri Oct 11 08:27:47 2024 +0100

    RecordBatchTransformer: Handle schema migration and column re-ordering in 
table scans (#602)
    
    * feat: Add skeleton of RecordBatchEvolutionProcessor
    
    * feat: Add initial implementation of RecordBatchEvolutionProcessor
    
    * feat: support more column types. Improve error handling. Add more comments
    
    * feat(wip): adress issues with reordered / skipped fields
    
    * feat: RecordBatchEvolutionProcessor handles skipped fields in projection
    
    * chore: add missing license header
    
    * chore: remove unneeded comment
    
    * refactor: rename to RecordBatchTransformer. Improve passthrough handling
    
    * feat: more performant handling of case where only schema transform is 
required but columns can remain unmodified
    
    * refactor: import arrow_cast rather than arrow
---
 Cargo.toml                                         |   1 +
 crates/iceberg/Cargo.toml                          |   1 +
 crates/iceberg/src/arrow/mod.rs                    |   2 +
 crates/iceberg/src/arrow/reader.rs                 |  11 +-
 .../iceberg/src/arrow/record_batch_transformer.rs  | 622 +++++++++++++++++++++
 crates/iceberg/src/arrow/schema.rs                 |   2 +-
 crates/iceberg/src/error.rs                        |   6 +
 crates/iceberg/src/lib.rs                          |   1 +
 crates/iceberg/src/scan.rs                         |  27 +
 9 files changed, 671 insertions(+), 2 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 82f98103..5e2b8973 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -41,6 +41,7 @@ apache-avro = "0.17"
 array-init = "2"
 arrow-arith = { version = "53" }
 arrow-array = { version = "53" }
+arrow-cast = { version = "53" }
 arrow-ord = { version = "53" }
 arrow-schema = { version = "53" }
 arrow-select = { version = "53" }
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 4d016094..1307cc6f 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -46,6 +46,7 @@ apache-avro = { workspace = true }
 array-init = { workspace = true }
 arrow-arith = { workspace = true }
 arrow-array = { workspace = true }
+arrow-cast = { workspace = true }
 arrow-ord = { workspace = true }
 arrow-schema = { workspace = true }
 arrow-select = { workspace = true }
diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs
index 2076a958..31a892fa 100644
--- a/crates/iceberg/src/arrow/mod.rs
+++ b/crates/iceberg/src/arrow/mod.rs
@@ -20,4 +20,6 @@
 mod schema;
 pub use schema::*;
 mod reader;
+pub(crate) mod record_batch_transformer;
+
 pub use reader::*;
diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index f6680e31..66c233f6 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -38,6 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, 
ProjectionMask, PARQUET_FI
 use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
 
+use crate::arrow::record_batch_transformer::RecordBatchTransformer;
 use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
 use crate::error::Result;
 use crate::expr::visitors::bound_predicate_visitor::{visit, 
BoundPredicateVisitor};
@@ -209,6 +210,12 @@ impl ArrowReader {
         )?;
         record_batch_stream_builder = 
record_batch_stream_builder.with_projection(projection_mask);
 
+        // RecordBatchTransformer performs any required transformations on the 
RecordBatches
+        // that come back from the file, such as type promotion, default 
column insertion
+        // and column re-ordering
+        let mut record_batch_transformer =
+            RecordBatchTransformer::build(task.schema_ref(), 
task.project_field_ids());
+
         if let Some(batch_size) = batch_size {
             record_batch_stream_builder = 
record_batch_stream_builder.with_batch_size(batch_size);
         }
@@ -261,8 +268,10 @@ impl ArrowReader {
         // Build the batch stream and send all the RecordBatches that it 
generates
         // to the requester.
         let mut record_batch_stream = record_batch_stream_builder.build()?;
+
         while let Some(batch) = record_batch_stream.try_next().await? {
-            tx.send(Ok(batch)).await?
+            tx.send(record_batch_transformer.process_record_batch(batch))
+                .await?
         }
 
         Ok(())
diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs 
b/crates/iceberg/src/arrow/record_batch_transformer.rs
new file mode 100644
index 00000000..01ce9f0a
--- /dev/null
+++ b/crates/iceberg/src/arrow/record_batch_transformer.rs
@@ -0,0 +1,622 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow_array::{
+    Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, 
Float64Array,
+    Int32Array, Int64Array, NullArray, RecordBatch, StringArray,
+};
+use arrow_cast::cast;
+use arrow_schema::{
+    DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, 
SchemaRef,
+};
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+
+use crate::arrow::schema_to_arrow_schema;
+use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema};
+use crate::{Error, ErrorKind, Result};
+
+/// Indicates how a particular column in a processed RecordBatch should
+/// be sourced.
+#[derive(Debug)]
+pub(crate) enum ColumnSource {
+    // signifies that a column should be passed through unmodified
+    // from the file's RecordBatch
+    PassThrough {
+        source_index: usize,
+    },
+
+    // signifies that a column from the file's RecordBatch has undergone
+    // type promotion so the source column with the given index needs
+    // to be promoted to the specified type
+    Promote {
+        target_type: DataType,
+        source_index: usize,
+    },
+
+    // Signifies that a new column has been inserted before the column
+    // with index `index`. (we choose "before" rather than "after" so
+    // that we can use usize; if we insert after, then we need to
+    // be able to store -1 here to signify that a new
+    // column is to be added at the front of the column list).
+    // If multiple columns need to be inserted at a given
+    // location, they should all be given the same index, as the index
+    // here refers to the original RecordBatch, not the interim state after
+    // a preceding operation.
+    Add {
+        target_type: DataType,
+        value: Option<PrimitiveLiteral>,
+    },
+    // The iceberg spec refers to other permissible schema evolution actions
+    // (see https://iceberg.apache.org/spec/#schema-evolution):
+    // renaming fields, deleting fields and reordering fields.
+    // Renames only affect the schema of the RecordBatch rather than the
+    // columns themselves, so a single updated cached schema can
+    // be re-used and no per-column actions are required.
+    // Deletion and Reorder can be achieved without needing this
+    // post-processing step by using the projection mask.
+}
+
+#[derive(Debug)]
+enum BatchTransform {
+    // Indicates that no changes need to be performed to the RecordBatches
+    // coming in from the stream and that they can be passed through
+    // unmodified
+    PassThrough,
+
+    Modify {
+        // Every transformed RecordBatch will have the same schema. We create 
the
+        // target just once and cache it here. Helpfully, Arc<Schema> is 
needed in
+        // the constructor for RecordBatch, so we don't need an expensive copy
+        // each time we build a new RecordBatch
+        target_schema: Arc<ArrowSchema>,
+
+        // Indicates how each column in the target schema is derived.
+        operations: Vec<ColumnSource>,
+    },
+
+    // Sometimes only the schema will need modifying, for example when
+    // the column names have changed vs the file, but not the column types.
+    // we can avoid a heap allocation per RecordBach in this case by retaining
+    // the existing column Vec.
+    ModifySchema {
+        target_schema: Arc<ArrowSchema>,
+    },
+}
+
+#[derive(Debug)]
+enum SchemaComparison {
+    Equivalent,
+    NameChangesOnly,
+    Different,
+}
+
+#[derive(Debug)]
+pub(crate) struct RecordBatchTransformer {
+    snapshot_schema: Arc<IcebergSchema>,
+    projected_iceberg_field_ids: Vec<i32>,
+
+    // BatchTransform gets lazily constructed based on the schema of
+    // the first RecordBatch we receive from the file
+    batch_transform: Option<BatchTransform>,
+}
+
+impl RecordBatchTransformer {
+    /// Build a RecordBatchTransformer for a given
+    /// Iceberg snapshot schema and list of projected field ids.
+    pub(crate) fn build(
+        snapshot_schema: Arc<IcebergSchema>,
+        projected_iceberg_field_ids: &[i32],
+    ) -> Self {
+        let projected_iceberg_field_ids = if 
projected_iceberg_field_ids.is_empty() {
+            // If the list of field ids is empty, this indicates that we
+            // need to select all fields.
+            // Project all fields in table schema order
+            snapshot_schema
+                .as_struct()
+                .fields()
+                .iter()
+                .map(|field| field.id)
+                .collect()
+        } else {
+            projected_iceberg_field_ids.to_vec()
+        };
+
+        Self {
+            snapshot_schema,
+            projected_iceberg_field_ids,
+            batch_transform: None,
+        }
+    }
+
+    pub(crate) fn process_record_batch(
+        &mut self,
+        record_batch: RecordBatch,
+    ) -> Result<RecordBatch> {
+        Ok(match &self.batch_transform {
+            Some(BatchTransform::PassThrough) => record_batch,
+            Some(BatchTransform::Modify {
+                ref target_schema,
+                ref operations,
+            }) => RecordBatch::try_new(
+                target_schema.clone(),
+                self.transform_columns(record_batch.columns(), operations)?,
+            )?,
+            Some(BatchTransform::ModifySchema { target_schema }) => {
+                record_batch.with_schema(target_schema.clone())?
+            }
+            None => {
+                self.batch_transform = Some(Self::generate_batch_transform(
+                    record_batch.schema_ref(),
+                    self.snapshot_schema.as_ref(),
+                    &self.projected_iceberg_field_ids,
+                )?);
+
+                self.process_record_batch(record_batch)?
+            }
+        })
+    }
+
+    // Compare the schema of the incoming RecordBatches to the schema of
+    // the Iceberg snapshot to determine what, if any, transformation
+    // needs to be applied. If the schemas match, we return 
BatchTransform::PassThrough
+    // to indicate that no changes need to be made. Otherwise, we return a
+    // BatchTransform::Modify containing the target RecordBatch schema and
+    // the list of `ColumnSource`s that indicate how to source each column in
+    // the resulting RecordBatches.
+    fn generate_batch_transform(
+        source_schema: &ArrowSchemaRef,
+        snapshot_schema: &IcebergSchema,
+        projected_iceberg_field_ids: &[i32],
+    ) -> Result<BatchTransform> {
+        let mapped_unprojected_arrow_schema = 
Arc::new(schema_to_arrow_schema(snapshot_schema)?);
+        let field_id_to_mapped_schema_map =
+            
Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?;
+
+        // Create a new arrow schema by selecting fields from 
mapped_unprojected,
+        // in the order of the field ids in projected_iceberg_field_ids
+        let fields: Result<Vec<_>> = projected_iceberg_field_ids
+            .iter()
+            .map(|field_id| {
+                Ok(field_id_to_mapped_schema_map
+                    .get(field_id)
+                    .ok_or(Error::new(ErrorKind::Unexpected, "field not 
found"))?
+                    .0
+                    .clone())
+            })
+            .collect();
+
+        let target_schema = Arc::new(ArrowSchema::new(fields?));
+
+        match Self::compare_schemas(source_schema, &target_schema) {
+            SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
+            SchemaComparison::NameChangesOnly => 
Ok(BatchTransform::ModifySchema { target_schema }),
+            SchemaComparison::Different => Ok(BatchTransform::Modify {
+                operations: Self::generate_transform_operations(
+                    source_schema,
+                    snapshot_schema,
+                    projected_iceberg_field_ids,
+                    field_id_to_mapped_schema_map,
+                )?,
+                target_schema,
+            }),
+        }
+    }
+
+    /// Compares the source and target schemas
+    /// Determines if they have changed in any meaningful way:
+    ///  * If they have different numbers of fields, then we need to modify
+    ///    the incoming RecordBatch schema AND columns
+    ///  * If they have the same number of fields, but some of them differ in
+    ///    either data type or nullability, then we need to modify the
+    ///    incoming RecordBatch schema AND columns
+    ///  * If the schemas differ only in the column names, then we need
+    ///    to modify the RecordBatch schema BUT we can keep the
+    ///    original column data unmodified
+    ///  * If the schemas are identical (or differ only in inconsequential
+    ///    ways) then we can pass through the original RecordBatch unmodified
+    fn compare_schemas(
+        source_schema: &ArrowSchemaRef,
+        target_schema: &ArrowSchemaRef,
+    ) -> SchemaComparison {
+        if source_schema.fields().len() != target_schema.fields().len() {
+            return SchemaComparison::Different;
+        }
+
+        let mut names_changed = false;
+
+        for (source_field, target_field) in source_schema
+            .fields()
+            .iter()
+            .zip(target_schema.fields().iter())
+        {
+            if source_field.data_type() != target_field.data_type()
+                || source_field.is_nullable() != target_field.is_nullable()
+            {
+                return SchemaComparison::Different;
+            }
+
+            if source_field.name() != target_field.name() {
+                names_changed = true;
+            }
+        }
+
+        if names_changed {
+            SchemaComparison::NameChangesOnly
+        } else {
+            SchemaComparison::Equivalent
+        }
+    }
+
+    fn generate_transform_operations(
+        source_schema: &ArrowSchemaRef,
+        snapshot_schema: &IcebergSchema,
+        projected_iceberg_field_ids: &[i32],
+        field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
+    ) -> Result<Vec<ColumnSource>> {
+        let field_id_to_source_schema_map =
+            Self::build_field_id_to_arrow_schema_map(source_schema)?;
+
+        projected_iceberg_field_ids.iter().map(|field_id|{
+            let (target_field, _) = 
field_id_to_mapped_schema_map.get(field_id).ok_or(
+                Error::new(ErrorKind::Unexpected, "could not find field in 
schema")
+            )?;
+            let target_type = target_field.data_type();
+
+            Ok(if let Some((source_field, source_index)) = 
field_id_to_source_schema_map.get(field_id) {
+                // column present in source
+
+                if source_field.data_type().equals_datatype(target_type) {
+                    // no promotion required
+                    ColumnSource::PassThrough {
+                        source_index: *source_index
+                    }
+                } else {
+                    // promotion required
+                    ColumnSource::Promote {
+                        target_type: target_type.clone(),
+                        source_index: *source_index,
+                    }
+                }
+            } else {
+                // column must be added
+                let iceberg_field = 
snapshot_schema.field_by_id(*field_id).ok_or(
+                    Error::new(ErrorKind::Unexpected, "Field not found in 
snapshot schema")
+                )?;
+
+                let default_value = if let Some(ref iceberg_default_value) =
+                    &iceberg_field.initial_default
+                {
+                    let Literal::Primitive(primitive_literal) = 
iceberg_default_value else {
+                        return Err(Error::new(
+                            ErrorKind::Unexpected,
+                            format!("Default value for column must be 
primitive type, but encountered {:?}", iceberg_default_value)
+                        ));
+                    };
+                    Some(primitive_literal.clone())
+                } else {
+                    None
+                };
+
+                ColumnSource::Add {
+                    value: default_value,
+                    target_type: target_type.clone(),
+                }
+            })
+        }).collect()
+    }
+
+    fn build_field_id_to_arrow_schema_map(
+        source_schema: &SchemaRef,
+    ) -> Result<HashMap<i32, (FieldRef, usize)>> {
+        let mut field_id_to_source_schema = HashMap::new();
+        for (source_field_idx, source_field) in 
source_schema.fields.iter().enumerate() {
+            let this_field_id = source_field
+                .metadata()
+                .get(PARQUET_FIELD_ID_META_KEY)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "field ID not present in parquet metadata",
+                    )
+                })?
+                .parse()
+                .map_err(|e| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("field id not parseable as an i32: {}", e),
+                    )
+                })?;
+
+            field_id_to_source_schema
+                .insert(this_field_id, (source_field.clone(), 
source_field_idx));
+        }
+
+        Ok(field_id_to_source_schema)
+    }
+
+    fn transform_columns(
+        &self,
+        columns: &[Arc<dyn ArrowArray>],
+        operations: &[ColumnSource],
+    ) -> Result<Vec<Arc<dyn ArrowArray>>> {
+        if columns.is_empty() {
+            return Ok(columns.to_vec());
+        }
+        let num_rows = columns[0].len();
+
+        operations
+            .iter()
+            .map(|op| {
+                Ok(match op {
+                    ColumnSource::PassThrough { source_index } => 
columns[*source_index].clone(),
+
+                    ColumnSource::Promote {
+                        target_type,
+                        source_index,
+                    } => cast(&*columns[*source_index], target_type)?,
+
+                    ColumnSource::Add { target_type, value } => {
+                        Self::create_column(target_type, value, num_rows)?
+                    }
+                })
+            })
+            .collect()
+    }
+
+    fn create_column(
+        target_type: &DataType,
+        prim_lit: &Option<PrimitiveLiteral>,
+        num_rows: usize,
+    ) -> Result<ArrayRef> {
+        Ok(match (target_type, prim_lit) {
+            (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
+                Arc::new(BooleanArray::from(vec![*value; num_rows]))
+            }
+            (DataType::Boolean, None) => {
+                let vals: Vec<Option<bool>> = vec![None; num_rows];
+                Arc::new(BooleanArray::from(vals))
+            }
+            (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
+                Arc::new(Int32Array::from(vec![*value; num_rows]))
+            }
+            (DataType::Int32, None) => {
+                let vals: Vec<Option<i32>> = vec![None; num_rows];
+                Arc::new(Int32Array::from(vals))
+            }
+            (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
+                Arc::new(Int64Array::from(vec![*value; num_rows]))
+            }
+            (DataType::Int64, None) => {
+                let vals: Vec<Option<i64>> = vec![None; num_rows];
+                Arc::new(Int64Array::from(vals))
+            }
+            (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
+                Arc::new(Float32Array::from(vec![value.0; num_rows]))
+            }
+            (DataType::Float32, None) => {
+                let vals: Vec<Option<f32>> = vec![None; num_rows];
+                Arc::new(Float32Array::from(vals))
+            }
+            (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
+                Arc::new(Float64Array::from(vec![value.0; num_rows]))
+            }
+            (DataType::Float64, None) => {
+                let vals: Vec<Option<f64>> = vec![None; num_rows];
+                Arc::new(Float64Array::from(vals))
+            }
+            (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
+                Arc::new(StringArray::from(vec![value.clone(); num_rows]))
+            }
+            (DataType::Utf8, None) => {
+                let vals: Vec<Option<String>> = vec![None; num_rows];
+                Arc::new(StringArray::from(vals))
+            }
+            (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
+                Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
+            }
+            (DataType::Binary, None) => {
+                let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
+                Arc::new(BinaryArray::from_opt_vec(vals))
+            }
+            (DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
+            (dt, _) => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    format!("unexpected target column type {}", dt),
+                ))
+            }
+        })
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    use arrow_array::{
+        Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, 
StringArray,
+    };
+    use arrow_schema::{DataType, Field, Schema as ArrowSchema};
+    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+
+    use crate::arrow::record_batch_transformer::RecordBatchTransformer;
+    use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type};
+
+    #[test]
+    fn build_field_id_to_source_schema_map_works() {
+        let arrow_schema = arrow_schema_already_same_as_target();
+
+        let result =
+            
RecordBatchTransformer::build_field_id_to_arrow_schema_map(&arrow_schema).unwrap();
+
+        let expected = HashMap::from_iter([
+            (10, (arrow_schema.fields()[0].clone(), 0)),
+            (11, (arrow_schema.fields()[1].clone(), 1)),
+            (12, (arrow_schema.fields()[2].clone(), 2)),
+            (14, (arrow_schema.fields()[3].clone(), 3)),
+            (15, (arrow_schema.fields()[4].clone(), 4)),
+        ]);
+
+        assert!(result.eq(&expected));
+    }
+
+    #[test]
+    fn 
processor_returns_properly_shaped_record_batch_when_no_schema_migration_required()
 {
+        let snapshot_schema = Arc::new(iceberg_table_schema());
+        let projected_iceberg_field_ids = [13, 14];
+
+        let mut inst = RecordBatchTransformer::build(snapshot_schema, 
&projected_iceberg_field_ids);
+
+        let result = inst
+            .process_record_batch(source_record_batch_no_migration_required())
+            .unwrap();
+
+        let expected = source_record_batch_no_migration_required();
+
+        assert_eq!(result, expected);
+    }
+
+    #[test]
+    fn 
processor_returns_properly_shaped_record_batch_when_schema_migration_required() 
{
+        let snapshot_schema = Arc::new(iceberg_table_schema());
+        let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, 
f
+
+        let mut inst = RecordBatchTransformer::build(snapshot_schema, 
&projected_iceberg_field_ids);
+
+        let result = inst.process_record_batch(source_record_batch()).unwrap();
+
+        let expected = expected_record_batch_migration_required();
+
+        assert_eq!(result, expected);
+    }
+
+    pub fn source_record_batch() -> RecordBatch {
+        RecordBatch::try_new(
+            arrow_schema_promotion_addition_and_renaming_required(),
+            vec![
+                Arc::new(Int32Array::from(vec![Some(1001), Some(1002), 
Some(1003)])), // b
+                Arc::new(Float32Array::from(vec![
+                    Some(12.125),
+                    Some(23.375),
+                    Some(34.875),
+                ])), // c
+                Arc::new(Int32Array::from(vec![Some(2001), Some(2002), 
Some(2003)])), // d
+                Arc::new(StringArray::from(vec![
+                    Some("Apache"),
+                    Some("Iceberg"),
+                    Some("Rocks"),
+                ])), // e
+            ],
+        )
+        .unwrap()
+    }
+
+    pub fn source_record_batch_no_migration_required() -> RecordBatch {
+        RecordBatch::try_new(
+            arrow_schema_no_promotion_addition_or_renaming_required(),
+            vec![
+                Arc::new(Int32Array::from(vec![Some(2001), Some(2002), 
Some(2003)])), // d
+                Arc::new(StringArray::from(vec![
+                    Some("Apache"),
+                    Some("Iceberg"),
+                    Some("Rocks"),
+                ])), // e
+            ],
+        )
+        .unwrap()
+    }
+
+    pub fn expected_record_batch_migration_required() -> RecordBatch {
+        RecordBatch::try_new(arrow_schema_already_same_as_target(), vec![
+            Arc::new(StringArray::from(Vec::<Option<String>>::from([
+                None, None, None,
+            ]))), // a
+            Arc::new(Int64Array::from(vec![Some(1001), Some(1002), 
Some(1003)])), // b
+            Arc::new(Float64Array::from(vec![
+                Some(12.125),
+                Some(23.375),
+                Some(34.875),
+            ])), // c
+            Arc::new(StringArray::from(vec![
+                Some("Apache"),
+                Some("Iceberg"),
+                Some("Rocks"),
+            ])), // e (d skipped by projection)
+            Arc::new(StringArray::from(vec![
+                Some("(╯°□°)╯"),
+                Some("(╯°□°)╯"),
+                Some("(╯°□°)╯"),
+            ])), // f
+        ])
+        .unwrap()
+    }
+
+    pub fn iceberg_table_schema() -> Schema {
+        Schema::builder()
+            .with_schema_id(2)
+            .with_fields(vec![
+                NestedField::optional(10, "a", 
Type::Primitive(PrimitiveType::String)).into(),
+                NestedField::required(11, "b", 
Type::Primitive(PrimitiveType::Long)).into(),
+                NestedField::required(12, "c", 
Type::Primitive(PrimitiveType::Double)).into(),
+                NestedField::required(13, "d", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::optional(14, "e", 
Type::Primitive(PrimitiveType::String)).into(),
+                NestedField::required(15, "f", 
Type::Primitive(PrimitiveType::String))
+                    .with_initial_default(Literal::string("(╯°□°)╯"))
+                    .into(),
+            ])
+            .build()
+            .unwrap()
+    }
+
+    fn arrow_schema_already_same_as_target() -> Arc<ArrowSchema> {
+        Arc::new(ArrowSchema::new(vec![
+            simple_field("a", DataType::Utf8, true, "10"),
+            simple_field("b", DataType::Int64, false, "11"),
+            simple_field("c", DataType::Float64, false, "12"),
+            simple_field("e", DataType::Utf8, true, "14"),
+            simple_field("f", DataType::Utf8, false, "15"),
+        ]))
+    }
+
+    fn arrow_schema_promotion_addition_and_renaming_required() -> 
Arc<ArrowSchema> {
+        Arc::new(ArrowSchema::new(vec![
+            simple_field("b", DataType::Int32, false, "11"),
+            simple_field("c", DataType::Float32, false, "12"),
+            simple_field("d", DataType::Int32, false, "13"),
+            simple_field("e_old", DataType::Utf8, true, "14"),
+        ]))
+    }
+
+    fn arrow_schema_no_promotion_addition_or_renaming_required() -> 
Arc<ArrowSchema> {
+        Arc::new(ArrowSchema::new(vec![
+            simple_field("d", DataType::Int32, false, "13"),
+            simple_field("e", DataType::Utf8, true, "14"),
+        ]))
+    }
+
+    /// Create a simple arrow field with metadata.
+    fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> 
Field {
+        Field::new(name, ty, nullable).with_metadata(HashMap::from([(
+            PARQUET_FIELD_ID_META_KEY.to_string(),
+            value.to_string(),
+        )]))
+    }
+}
diff --git a/crates/iceberg/src/arrow/schema.rs 
b/crates/iceberg/src/arrow/schema.rs
index e73b409c..ab30bed8 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -207,7 +207,7 @@ fn visit_schema<V: ArrowSchemaVisitor>(schema: 
&ArrowSchema, visitor: &mut V) ->
     visitor.schema(schema, results)
 }
 
-/// Convert Arrow schema to ceberg schema.
+/// Convert Arrow schema to Iceberg schema.
 pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
     let mut visitor = ArrowSchemaConverter::new();
     visit_schema(schema, &mut visitor)
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 2b69b470..3f50acac 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -337,6 +337,12 @@ define_from_err!(
     "Failed to send a message to a channel"
 );
 
+define_from_err!(
+    arrow_schema::ArrowError,
+    ErrorKind::Unexpected,
+    "Arrow Schema Error"
+);
+
 define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
 
 /// Converts a timestamp in milliseconds to `DateTime<Utc>`, handling errors.
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index d6c5010d..72cf18d4 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -55,6 +55,7 @@
 
 #[macro_use]
 extern crate derive_builder;
+extern crate core;
 
 mod error;
 pub use error::{Error, ErrorKind, Result};
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index f5cbbcf0..ef0e5f54 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -906,6 +906,33 @@ pub struct FileScanTask {
     pub predicate: Option<BoundPredicate>,
 }
 
+impl FileScanTask {
+    /// Returns the data file path of this file scan task.
+    pub fn data_file_path(&self) -> &str {
+        &self.data_file_path
+    }
+
+    /// Returns the project field id of this file scan task.
+    pub fn project_field_ids(&self) -> &[i32] {
+        &self.project_field_ids
+    }
+
+    /// Returns the predicate of this file scan task.
+    pub fn predicate(&self) -> Option<&BoundPredicate> {
+        self.predicate.as_ref()
+    }
+
+    /// Returns the schema of this file scan task as a reference
+    pub fn schema(&self) -> &Schema {
+        &self.schema
+    }
+
+    /// Returns the schema of this file scan task as a SchemaRef
+    pub fn schema_ref(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::HashMap;

Reply via email to