This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d4851f0 feat(reader): Add PartitionSpec support to FileScanTask and 
RecordBatchTransformer (#1821)
8d4851f0 is described below

commit 8d4851f0db824050c0419bd20d939e5dd3a5a80b
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Nov 13 04:35:12 2025 -0500

    feat(reader): Add PartitionSpec support to FileScanTask and 
RecordBatchTransformer (#1821)
    
    ## Which issue does this PR close?
    
    Partially address #1749.
    
    ## What changes are included in this PR?
    
    This PR adds partition spec handling to `FileScanTask` and
    `RecordBatchTransformer` to correctly implement the Iceberg spec's
    "Column Projection" rules for fields "not present" in data files.
    
    ### Problem Statement
    
    Prior to this PR, `iceberg-rust`'s `FileScanTask` had no mechanism to
    pass partition information to `RecordBatchTransformer`, causing two
    issues:
    
    1. **Incorrect handling of bucket partitioning**: Couldn't distinguish
    identity transforms (which should use partition metadata constants) from
    non-identity transforms like bucket/truncate/year/month (which must read
    from data file). For example, `bucket(4, id)` stores
    `id_bucket = 2` (bucket number) in partition metadata, but actual `id`
    values (100, 200, 300) are only in the data file. iceberg-rust was
    incorrectly treating bucket-partitioned source columns as constants,
    breaking runtime filtering and returning incorrect query results.
    
    2. **Field ID conflicts in add_files scenarios**: When importing Hive
    tables via `add_files`, partition columns could have field IDs
    conflicting with Parquet data columns. Example: Parquet has
    field_id=1→"name", but Iceberg expects field_id=1→"id" (partition). Per
    spec, the
    correct field is "not present" and requires name mapping fallback.
    
    ### Iceberg Specification Requirements
    
    Per the Iceberg spec
    (https://iceberg.apache.org/spec/#column-projection), when a field ID is
    "not present" in a data file, it must be resolved using these rules:
    
    1. Return the value from partition metadata if an **Identity Transform**
    exists
    2. Use `schema.name-mapping.default` metadata to map field id to columns
    without field id
    3. Return the default value if it has a defined `initial-default`
    4. Return null in all other cases
    
    **Why this matters:**
    - **Identity transforms** (e.g., `identity(dept)`) store actual column
    values in partition metadata that can be used as constants without
    reading the data file
    - **Non-identity transforms** (e.g., `bucket(4, id)`, `day(timestamp)`)
    store transformed values in partition metadata (e.g., bucket number 2,
    not the actual `id` values 100, 200, 300) and must read source columns
    from the data file
    
    ### Changes Made
    
    1. **Added partition fields to `FileScanTask`** (`scan/task.rs`):
    - `partition: Option<Struct>` - Partition data from manifest entry
    - `partition_spec: Option<Arc<PartitionSpec>>` - For transform-aware
    constant detection
    - `name_mapping: Option<Arc<NameMapping>>` - Name mapping from table
    metadata
    
    2. **Implemented `constants_map()` function**
    (`arrow/record_batch_transformer.rs`):
    - Replicates Java's `PartitionUtil.constantsMap()` behavior
    - Only includes fields where transform is `Transform::Identity`
    - Used to determine which fields use partition metadata constants vs.
    reading from data files
    
    3. **Enhanced `RecordBatchTransformer`**
    (`arrow/record_batch_transformer.rs`):
    - Added `build_with_partition_data()` method to accept partition spec,
    partition data, and name mapping
    - Implements all 4 spec rules for column resolution with
    identity-transform awareness
    - Detects field ID conflicts by verifying both field ID AND name match
    - Falls back to name mapping when field IDs are missing/conflicting
    (spec rule #2)
    
    4. **Updated `ArrowReader`** (`arrow/reader.rs`):
    - Uses `build_with_partition_data()` when partition information is
    available
    - Falls back to `build()` when not available
    
    5. **Updated manifest entry processing** (`scan/context.rs`):
    - Populates partition fields in `FileScanTask` from manifest entry data
    
    ### Tests Added
    
    1. **`bucket_partitioning_reads_source_column_from_file`** - Verifies
    that bucket-partitioned source columns are read from data files (not
    treated as constants from partition metadata)
    
    2. **`identity_partition_uses_constant_from_metadata`** - Verifies that
    identity-transformed fields correctly use partition metadata constants
    
    3. **`test_bucket_partitioning_with_renamed_source_column`** - Verifies
    field-ID-based mapping works despite column rename
    
    4. **`add_files_partition_columns_without_field_ids`** - Verifies name
    mapping resolution for Hive table imports without field IDs (spec rule
    #2)
    
    5. **`add_files_with_true_field_id_conflict`** - Verifies correct field
    ID conflict detection with name mapping fallback (spec rule #2)
    
    6. **`test_all_four_spec_rules`** - Integration test verifying all 4
    spec rules work together
    
    ## Are these changes tested?
    
    Yes, there are 6 new unit tests covering all 4 Iceberg spec rules. This
    also resolved approximately 50 Iceberg Java tests when running with
    DataFusion Comet's experimental
    https://github.com/apache/datafusion-comet/pull/2528 PR.
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 .../src/arrow/caching_delete_file_loader.rs        |   3 +
 crates/iceberg/src/arrow/delete_file_loader.rs     |   4 +-
 crates/iceberg/src/arrow/delete_filter.rs          |   6 +
 crates/iceberg/src/arrow/reader.rs                 | 339 +++++++-
 .../iceberg/src/arrow/record_batch_transformer.rs  | 894 +++++++++++++++++++--
 crates/iceberg/src/scan/context.rs                 |   7 +
 crates/iceberg/src/scan/mod.rs                     |   6 +
 crates/iceberg/src/scan/task.rs                    |  54 +-
 8 files changed, 1222 insertions(+), 91 deletions(-)

diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs 
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 8a3ab3a9..f1c4f86f 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -881,6 +881,9 @@ mod tests {
             project_field_ids: vec![2, 3],
             predicate: None,
             deletes: vec![pos_del, eq_del],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
         };
 
         // Load the deletes - should handle both types without error
diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs 
b/crates/iceberg/src/arrow/delete_file_loader.rs
index c0b1392d..e12daf53 100644
--- a/crates/iceberg/src/arrow/delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/delete_file_loader.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
 use futures::{StreamExt, TryStreamExt};
 
 use crate::arrow::ArrowReader;
-use crate::arrow::record_batch_transformer::RecordBatchTransformer;
+use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
 use crate::io::FileIO;
 use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
 use crate::spec::{Schema, SchemaRef};
@@ -82,7 +82,7 @@ impl BasicDeleteFileLoader {
         equality_ids: &[i32],
     ) -> Result<ArrowRecordBatchStream> {
         let mut record_batch_transformer =
-            RecordBatchTransformer::build(target_schema.clone(), equality_ids);
+            RecordBatchTransformerBuilder::new(target_schema.clone(), 
equality_ids).build();
 
         let record_batch_stream = record_batch_stream.map(move |record_batch| {
             record_batch.and_then(|record_batch| {
diff --git a/crates/iceberg/src/arrow/delete_filter.rs 
b/crates/iceberg/src/arrow/delete_filter.rs
index 4250974b..14b5124e 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -341,6 +341,9 @@ pub(crate) mod tests {
                 project_field_ids: vec![],
                 predicate: None,
                 deletes: vec![pos_del_1, pos_del_2.clone()],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             },
             FileScanTask {
                 start: 0,
@@ -352,6 +355,9 @@ pub(crate) mod tests {
                 project_field_ids: vec![],
                 predicate: None,
                 deletes: vec![pos_del_3],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             },
         ];
 
diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index fed8f19c..ab5a96f7 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -45,7 +45,7 @@ use parquet::file::metadata::{
 use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
 
 use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
-use crate::arrow::record_batch_transformer::RecordBatchTransformer;
+use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
 use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
 use crate::delete_vector::DeleteVector;
 use crate::error::Result;
@@ -55,7 +55,7 @@ use 
crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
 use crate::expr::{BoundPredicate, BoundReference};
 use crate::io::{FileIO, FileMetadata, FileRead};
 use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
-use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
+use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, 
Type};
 use crate::utils::available_parallelism;
 use crate::{Error, ErrorKind};
 
@@ -181,7 +181,8 @@ impl ArrowReader {
         let should_load_page_index =
             (row_selection_enabled && task.predicate.is_some()) || 
!task.deletes.is_empty();
 
-        let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, 
task.schema.clone());
+        let delete_filter_rx =
+            delete_file_loader.load_deletes(&task.deletes, 
Arc::clone(&task.schema));
 
         // Migrated tables lack field IDs, requiring us to inspect the schema 
to choose
         // between field-ID-based or position-based projection
@@ -193,7 +194,9 @@ impl ArrowReader {
         )
         .await?;
 
-        // Parquet files from Hive/Spark migrations lack field IDs in their 
metadata
+        // Check if Parquet file has embedded field IDs
+        // Corresponds to Java's ParquetSchemaUtil.hasIds()
+        // Reference: 
parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
         let missing_field_ids = initial_stream_builder
             .schema()
             .fields()
@@ -201,11 +204,38 @@ impl ArrowReader {
             .next()
             .is_some_and(|f| 
f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
 
-        // Adding position-based fallback IDs at schema level (not per-batch) 
enables projection
-        // on files that lack embedded field IDs. We recreate the builder to 
apply the modified schema.
+        // Three-branch schema resolution strategy matching Java's ReadConf 
constructor
+        //
+        // Per Iceberg spec Column Projection rules:
+        // "Columns in Iceberg data files are selected by field id. The table 
schema's column
+        //  names and order may change after a data file is written, and 
projection must be done
+        //  using field ids."
+        // https://iceberg.apache.org/spec/#column-projection
+        //
+        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via 
add_files),
+        // we must assign field IDs BEFORE reading data to enable correct 
projection.
+        //
+        // Java's ReadConf determines field ID strategy:
+        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use 
pruneColumns()
+        // - Branch 2: nameMapping present → applyNameMapping(), then 
pruneColumns()
+        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
         let mut record_batch_stream_builder = if missing_field_ids {
-            let arrow_schema =
-                
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema());
+            // Parquet file lacks field IDs - must assign them before reading
+            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
+                // Branch 2: Apply name mapping to assign correct Iceberg 
field IDs
+                // Per spec rule #2: "Use schema.name-mapping.default metadata 
to map field id
+                // to columns without field id"
+                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
+                apply_name_mapping_to_arrow_schema(
+                    Arc::clone(initial_stream_builder.schema()),
+                    name_mapping,
+                )?
+            } else {
+                // Branch 3: No name mapping - use position-based fallback IDs
+                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
+                
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
+            };
+
             let options = ArrowReaderOptions::new().with_schema(arrow_schema);
 
             Self::create_parquet_record_batch_stream_builder(
@@ -216,11 +246,14 @@ impl ArrowReader {
             )
             .await?
         } else {
+            // Branch 1: File has embedded field IDs - trust them
             initial_stream_builder
         };
 
-        // Fallback IDs don't match Parquet's embedded field IDs (since they 
don't exist),
-        // so we must use position-based projection instead of field-ID 
matching
+        // Create projection mask based on field IDs
+        // - If file has embedded IDs: field-ID-based projection 
(missing_field_ids=false)
+        // - If name mapping applied: field-ID-based projection 
(missing_field_ids=true but IDs now match)
+        // - If fallback IDs: position-based projection 
(missing_field_ids=true)
         let projection_mask = Self::get_arrow_projection_mask(
             &task.project_field_ids,
             &task.schema,
@@ -234,9 +267,18 @@ impl ArrowReader {
 
         // RecordBatchTransformer performs any transformations required on the 
RecordBatches
         // that come back from the file, such as type promotion, default 
column insertion
-        // and column re-ordering
-        let mut record_batch_transformer =
-            RecordBatchTransformer::build(task.schema_ref(), 
task.project_field_ids());
+        // and column re-ordering.
+        let mut record_batch_transformer_builder =
+            RecordBatchTransformerBuilder::new(task.schema_ref(), 
task.project_field_ids());
+
+        if let (Some(partition_spec), Some(partition_data)) =
+            (task.partition_spec.clone(), task.partition.clone())
+        {
+            record_batch_transformer_builder =
+                
record_batch_transformer_builder.with_partition(partition_spec, partition_data);
+        }
+
+        let mut record_batch_transformer = 
record_batch_transformer_builder.build();
 
         if let Some(batch_size) = batch_size {
             record_batch_stream_builder = 
record_batch_stream_builder.with_batch_size(batch_size);
@@ -919,6 +961,77 @@ fn build_fallback_field_id_map(parquet_schema: 
&SchemaDescriptor) -> HashMap<i32
     column_map
 }
 
+/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
+///
+/// Assigns Iceberg field IDs based on column names using the name mapping,
+/// enabling correct projection on migrated files (e.g., from Hive/Spark via 
add_files).
+///
+/// Per Iceberg spec Column Projection rule #2:
+/// "Use schema.name-mapping.default metadata to map field id to columns 
without field id"
+/// https://iceberg.apache.org/spec/#column-projection
+///
+/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and 
ApplyNameMapping visitor.
+/// The key difference is Java operates on Parquet MessageType, while we 
operate on Arrow Schema.
+///
+/// # Arguments
+/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
+/// * `name_mapping` - Name mapping from table metadata 
(TableProperties.DEFAULT_NAME_MAPPING)
+///
+/// # Returns
+/// Arrow schema with field IDs assigned based on name mapping
+fn apply_name_mapping_to_arrow_schema(
+    arrow_schema: ArrowSchemaRef,
+    name_mapping: &NameMapping,
+) -> Result<Arc<ArrowSchema>> {
+    debug_assert!(
+        arrow_schema
+            .fields()
+            .iter()
+            .next()
+            .is_none_or(|f| 
f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
+        "Schema already has field IDs - name mapping should not be applied"
+    );
+
+    use arrow_schema::Field;
+
+    let fields_with_mapped_ids: Vec<_> = arrow_schema
+        .fields()
+        .iter()
+        .map(|field| {
+            // Look up this column name in name mapping to get the Iceberg 
field ID.
+            // Corresponds to Java's ApplyNameMapping visitor which calls
+            // nameMapping.find(currentPath()) and returns field.withId() if 
found.
+            //
+            // If the field isn't in the mapping, leave it WITHOUT assigning 
an ID
+            // (matching Java's behavior of returning the field unchanged).
+            // Later, during projection, fields without IDs are filtered out.
+            let mapped_field_opt = name_mapping
+                .fields()
+                .iter()
+                .find(|f| f.names().contains(&field.name().to_string()));
+
+            let mut metadata = field.metadata().clone();
+
+            if let Some(mapped_field) = mapped_field_opt {
+                if let Some(field_id) = mapped_field.field_id() {
+                    // Field found in mapping with a field_id → assign it
+                    metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), 
field_id.to_string());
+                }
+                // If field_id is None, leave the field without an ID (will be 
filtered by projection)
+            }
+            // If field not found in mapping, leave it without an ID (will be 
filtered by projection)
+
+            Field::new(field.name(), field.data_type().clone(), 
field.is_nullable())
+                .with_metadata(metadata)
+        })
+        .collect();
+
+    Ok(Arc::new(ArrowSchema::new_with_metadata(
+        fields_with_mapped_ids,
+        arrow_schema.metadata().clone(),
+    )))
+}
+
 /// Add position-based fallback field IDs to Arrow schema for Parquet files 
lacking them.
 /// Enables projection on migrated files (e.g., from Hive/Spark).
 ///
@@ -1948,6 +2061,9 @@ message schema {
                 project_field_ids: vec![1],
                 predicate: Some(predicate.bind(schema, true).unwrap()),
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -2266,6 +2382,9 @@ message schema {
             project_field_ids: vec![1],
             predicate: None,
             deletes: vec![],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
         };
 
         // Task 2: read the second and third row groups
@@ -2279,6 +2398,9 @@ message schema {
             project_field_ids: vec![1],
             predicate: None,
             deletes: vec![],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
         };
 
         let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as 
FileScanTaskStream;
@@ -2403,6 +2525,9 @@ message schema {
                 project_field_ids: vec![1, 2], // Request both columns 'a' and 
'b'
                 predicate: None,
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -2571,6 +2696,9 @@ message schema {
                 partition_spec_id: 0,
                 equality_ids: None,
             }],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
         };
 
         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
@@ -2786,6 +2914,9 @@ message schema {
                 partition_spec_id: 0,
                 equality_ids: None,
             }],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
         };
 
         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
@@ -2994,6 +3125,9 @@ message schema {
                 partition_spec_id: 0,
                 equality_ids: None,
             }],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
         };
 
         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
@@ -3094,6 +3228,9 @@ message schema {
                 project_field_ids: vec![1, 2],
                 predicate: None,
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3188,6 +3325,9 @@ message schema {
                 project_field_ids: vec![1, 3],
                 predicate: None,
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3271,6 +3411,9 @@ message schema {
                 project_field_ids: vec![1, 2, 3],
                 predicate: None,
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3368,6 +3511,9 @@ message schema {
                 project_field_ids: vec![1, 2],
                 predicate: None,
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3494,6 +3640,9 @@ message schema {
                 project_field_ids: vec![1, 2],
                 predicate: None,
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3587,6 +3736,9 @@ message schema {
                 project_field_ids: vec![1, 5, 2],
                 predicate: None,
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3693,6 +3845,9 @@ message schema {
                 project_field_ids: vec![1, 2, 3],
                 predicate: Some(predicate.bind(schema, true).unwrap()),
                 deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3708,4 +3863,162 @@ message schema {
         // Should return empty results
         assert!(result.is_empty() || result.iter().all(|batch| 
batch.num_rows() == 0));
     }
+
+    /// Test bucket partitioning reads source column from data file (not 
partition metadata).
+    ///
+    /// This is an integration test verifying the complete ArrowReader 
pipeline with bucket partitioning.
+    /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., 
testRenamedSourceColumnTable).
+    ///
+    /// # Iceberg Spec Requirements
+    ///
+    /// Per the Iceberg spec "Column Projection" section:
+    /// > "Return the value from partition metadata if an **Identity 
Transform** exists for the field"
+    ///
+    /// This means:
+    /// - Identity transforms (e.g., `identity(dept)`) use constants from 
partition metadata
+    /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source 
columns from data files
+    /// - Partition metadata for bucket transforms stores bucket numbers 
(0-3), NOT source values
+    ///
+    /// Java's PartitionUtil.constantsMap() implements this via:
+    /// ```java
+    /// if (field.transform().isIdentity()) {
+    ///     idToConstant.put(field.sourceId(), converted);
+    /// }
+    /// ```
+    ///
+    /// # What This Test Verifies
+    ///
+    /// This test ensures the full ArrowReader → RecordBatchTransformer 
pipeline correctly handles
+    /// bucket partitioning when FileScanTask provides partition_spec and 
partition_data:
+    ///
+    /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
+    /// - FileScanTask specifies partition_spec with bucket(4, id) and 
partition_data with bucket=1
+    /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned 
field from constants
+    /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
+    /// - Values are NOT replaced with constant 1 from partition metadata
+    ///
+    /// # Why This Matters
+    ///
+    /// Without correct handling:
+    /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
+    /// - Query results would be incorrect (all rows would have id=1)
+    /// - Bucket partitioning would be unusable for query optimization
+    ///
+    /// # References
+    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition 
Transforms"
+    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
+    /// - Java impl: 
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+    #[tokio::test]
+    async fn test_bucket_partitioning_reads_source_column_from_file() {
+        use arrow_array::Int32Array;
+
+        use crate::spec::{Literal, PartitionSpec, Struct, Transform};
+
+        // Iceberg schema with id and name columns
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(0)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Partition spec: bucket(4, id)
+        let partition_spec = Arc::new(
+            PartitionSpec::builder(schema.clone())
+                .with_spec_id(0)
+                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
+                .unwrap()
+                .build()
+                .unwrap(),
+        );
+
+        // Partition data: bucket value is 1
+        let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
+
+        // Create Arrow schema with field IDs for Parquet file
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+            Field::new("name", DataType::Utf8, 
true).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "2".to_string(),
+            )])),
+        ]));
+
+        // Write Parquet file with data
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as 
ArrayRef;
+        let name_data =
+            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", 
"Dave"])) as ArrayRef;
+
+        let to_write =
+            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
name_data]).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+        let file = File::create(format!("{}/data.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        // Read the Parquet file with partition spec and data
+        let reader = ArrowReaderBuilder::new(file_io).build();
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/data.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2],
+                predicate: None,
+                deletes: vec![],
+                partition: Some(partition_data),
+                partition_spec: Some(partition_spec),
+                name_mapping: None,
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        // Verify we got the correct data
+        assert_eq!(result.len(), 1);
+        let batch = &result[0];
+
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.num_rows(), 4);
+
+        // The id column MUST contain actual values from the Parquet file [1, 
5, 9, 13],
+        // NOT the constant partition value 1
+        let id_col = batch
+            .column(0)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(id_col.value(0), 1);
+        assert_eq!(id_col.value(1), 5);
+        assert_eq!(id_col.value(2), 9);
+        assert_eq!(id_col.value(3), 13);
+
+        let name_col = batch.column(1).as_string::<i32>();
+        assert_eq!(name_col.value(0), "Alice");
+        assert_eq!(name_col.value(1), "Bob");
+        assert_eq!(name_col.value(2), "Charlie");
+        assert_eq!(name_col.value(3), "Dave");
+    }
 }
diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs 
b/crates/iceberg/src/arrow/record_batch_transformer.rs
index 5fbbbb10..e7d8b8f0 100644
--- a/crates/iceberg/src/arrow/record_batch_transformer.rs
+++ b/crates/iceberg/src/arrow/record_batch_transformer.rs
@@ -29,9 +29,44 @@ use arrow_schema::{
 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
 
 use crate::arrow::schema_to_arrow_schema;
-use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema};
+use crate::spec::{
+    Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, 
Transform,
+};
 use crate::{Error, ErrorKind, Result};
 
+/// Build a map of field ID to constant value for identity-partitioned fields.
+///
+/// Implements Iceberg spec "Column Projection" rule #1: use partition 
metadata constants
+/// only for identity-transformed fields. Non-identity transforms (bucket, 
truncate, year, etc.)
+/// store derived values in partition metadata, so source columns must be read 
from data files.
+///
+/// Example: For `bucket(4, id)`, partition metadata has `id_bucket = 2` 
(bucket number),
+/// but the actual `id` values (100, 200, 300) are only in the data file.
+///
+/// Matches Java's `PartitionUtil.constantsMap()` which filters `if 
(field.transform().isIdentity())`.
+///
+/// # References
+/// - Spec: https://iceberg.apache.org/spec/#column-projection
+/// - Java: 
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:constantsMap()
+fn constants_map(
+    partition_spec: &PartitionSpec,
+    partition_data: &Struct,
+) -> HashMap<i32, PrimitiveLiteral> {
+    let mut constants = HashMap::new();
+
+    for (pos, field) in partition_spec.fields().iter().enumerate() {
+        // Only identity transforms should use constant values from partition 
metadata
+        if matches!(field.transform, Transform::Identity) {
+            // Get the partition value for this field
+            if let Some(Literal::Primitive(value)) = &partition_data[pos] {
+                constants.insert(field.source_id, value.clone());
+            }
+        }
+    }
+
+    constants
+}
+
 /// Indicates how a particular column in a processed RecordBatch should
 /// be sourced.
 #[derive(Debug)]
@@ -107,32 +142,107 @@ enum SchemaComparison {
     Different,
 }
 
+/// Builder for RecordBatchTransformer to improve ergonomics when constructing 
with optional parameters.
+///
+/// See [`RecordBatchTransformer`] for details on partition spec and partition 
data.
 #[derive(Debug)]
-pub(crate) struct RecordBatchTransformer {
+pub(crate) struct RecordBatchTransformerBuilder {
     snapshot_schema: Arc<IcebergSchema>,
     projected_iceberg_field_ids: Vec<i32>,
-
-    // BatchTransform gets lazily constructed based on the schema of
-    // the first RecordBatch we receive from the file
-    batch_transform: Option<BatchTransform>,
+    partition_spec: Option<Arc<PartitionSpec>>,
+    partition_data: Option<Struct>,
 }
 
-impl RecordBatchTransformer {
-    /// Build a RecordBatchTransformer for a given
-    /// Iceberg snapshot schema and list of projected field ids.
-    pub(crate) fn build(
+impl RecordBatchTransformerBuilder {
+    pub(crate) fn new(
         snapshot_schema: Arc<IcebergSchema>,
         projected_iceberg_field_ids: &[i32],
     ) -> Self {
-        let projected_iceberg_field_ids = projected_iceberg_field_ids.to_vec();
-
         Self {
             snapshot_schema,
-            projected_iceberg_field_ids,
+            projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(),
+            partition_spec: None,
+            partition_data: None,
+        }
+    }
+
+    /// Set partition spec and data together for identifying 
identity-transformed partition columns.
+    ///
+    /// Both partition_spec and partition_data must be provided together since 
the spec defines
+    /// which fields are identity-partitioned, and the data provides their 
constant values.
+    /// One without the other cannot produce a valid constants map.
+    pub(crate) fn with_partition(
+        mut self,
+        partition_spec: Arc<PartitionSpec>,
+        partition_data: Struct,
+    ) -> Self {
+        self.partition_spec = Some(partition_spec);
+        self.partition_data = Some(partition_data);
+        self
+    }
+
+    pub(crate) fn build(self) -> RecordBatchTransformer {
+        RecordBatchTransformer {
+            snapshot_schema: self.snapshot_schema,
+            projected_iceberg_field_ids: self.projected_iceberg_field_ids,
+            partition_spec: self.partition_spec,
+            partition_data: self.partition_data,
             batch_transform: None,
         }
     }
+}
+
+/// Transforms RecordBatches from Parquet files to match the Iceberg table 
schema.
+///
+/// Handles schema evolution, column reordering, type promotion, and 
implements the Iceberg spec's
+/// "Column Projection" rules for resolving field IDs "not present" in data 
files:
+/// 1. Return the value from partition metadata if an Identity Transform exists
+/// 2. Use schema.name-mapping.default metadata to map field id to columns 
without field id (applied in ArrowReader)
+/// 3. Return the default value if it has a defined initial-default
+/// 4. Return null in all other cases
+///
+/// # Field ID Resolution
+///
+/// Field ID resolution happens in ArrowReader before data is read (matching 
Java's ReadConf):
+/// - If file has embedded field IDs: trust them (ParquetSchemaUtil.hasIds() = 
true)
+/// - If file lacks IDs and name_mapping exists: apply name mapping 
(ParquetSchemaUtil.applyNameMapping())
+/// - If file lacks IDs and no name_mapping: use position-based fallback 
(ParquetSchemaUtil.addFallbackIds())
+///
+/// By the time RecordBatchTransformer processes data, all field IDs are 
trustworthy.
+/// This transformer only handles remaining projection rules (#1, #3, #4) for 
fields still "not present".
+///
+/// # Partition Spec and Data
+///
+/// **Bucket partitioning**: Distinguish identity transforms (use partition 
metadata constants)
+/// from non-identity transforms like bucket (read from data file) to enable 
runtime filtering on
+/// bucket-partitioned columns. For example, `bucket(4, id)` stores only the 
bucket number in
+/// partition metadata, so actual `id` values must be read from the data file.
+///
+/// # References
+/// - Spec: https://iceberg.apache.org/spec/#column-projection
+/// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java 
(field ID resolution)
+/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java 
(partition constants)
+#[derive(Debug)]
+pub(crate) struct RecordBatchTransformer {
+    snapshot_schema: Arc<IcebergSchema>,
+    projected_iceberg_field_ids: Vec<i32>,
+
+    /// Partition spec for identifying identity-transformed partition columns 
(spec rule #1).
+    /// Only fields with identity transforms use partition data constants; 
non-identity transforms
+    /// (bucket, truncate, etc.) must read source columns from data files.
+    partition_spec: Option<Arc<PartitionSpec>>,
 
+    /// Partition data providing constant values for identity-transformed 
partition columns (spec rule #1).
+    /// For example, in a file at path `dept=engineering/file.parquet`, this 
would contain
+    /// the value "engineering" for the dept field.
+    partition_data: Option<Struct>,
+
+    // BatchTransform gets lazily constructed based on the schema of
+    // the first RecordBatch we receive from the file
+    batch_transform: Option<BatchTransform>,
+}
+
+impl RecordBatchTransformer {
     pub(crate) fn process_record_batch(
         &mut self,
         record_batch: RecordBatch,
@@ -147,7 +257,7 @@ impl RecordBatchTransformer {
                     .with_match_field_names(false)
                     .with_row_count(Some(record_batch.num_rows()));
                 RecordBatch::try_new_with_options(
-                    target_schema.clone(),
+                    Arc::clone(target_schema),
                     self.transform_columns(record_batch.columns(), 
operations)?,
                     &options,
                 )?
@@ -157,7 +267,7 @@ impl RecordBatchTransformer {
                     .with_match_field_names(false)
                     .with_row_count(Some(record_batch.num_rows()));
                 RecordBatch::try_new_with_options(
-                    target_schema.clone(),
+                    Arc::clone(target_schema),
                     record_batch.columns().to_vec(),
                     &options,
                 )?
@@ -167,6 +277,8 @@ impl RecordBatchTransformer {
                     record_batch.schema_ref(),
                     self.snapshot_schema.as_ref(),
                     &self.projected_iceberg_field_ids,
+                    self.partition_spec.as_ref().map(|s| s.as_ref()),
+                    self.partition_data.as_ref(),
                 )?);
 
                 self.process_record_batch(record_batch)?
@@ -185,6 +297,8 @@ impl RecordBatchTransformer {
         source_schema: &ArrowSchemaRef,
         snapshot_schema: &IcebergSchema,
         projected_iceberg_field_ids: &[i32],
+        partition_spec: Option<&PartitionSpec>,
+        partition_data: Option<&Struct>,
     ) -> Result<BatchTransform> {
         let mapped_unprojected_arrow_schema = 
Arc::new(schema_to_arrow_schema(snapshot_schema)?);
         let field_id_to_mapped_schema_map =
@@ -205,6 +319,12 @@ impl RecordBatchTransformer {
 
         let target_schema = Arc::new(ArrowSchema::new(fields?));
 
+        let constants_map = if let (Some(spec), Some(data)) = (partition_spec, 
partition_data) {
+            constants_map(spec, data)
+        } else {
+            HashMap::new()
+        };
+
         match Self::compare_schemas(source_schema, &target_schema) {
             SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
             SchemaComparison::NameChangesOnly => 
Ok(BatchTransform::ModifySchema { target_schema }),
@@ -214,6 +334,8 @@ impl RecordBatchTransformer {
                     snapshot_schema,
                     projected_iceberg_field_ids,
                     field_id_to_mapped_schema_map,
+                    constants_map,
+                    partition_spec,
                 )?,
                 target_schema,
             }),
@@ -270,57 +392,92 @@ impl RecordBatchTransformer {
         snapshot_schema: &IcebergSchema,
         projected_iceberg_field_ids: &[i32],
         field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
+        constants_map: HashMap<i32, PrimitiveLiteral>,
+        _partition_spec: Option<&PartitionSpec>,
     ) -> Result<Vec<ColumnSource>> {
         let field_id_to_source_schema_map =
             Self::build_field_id_to_arrow_schema_map(source_schema)?;
 
-        projected_iceberg_field_ids.iter().map(|field_id|{
-            let (target_field, _) = 
field_id_to_mapped_schema_map.get(field_id).ok_or(
-                Error::new(ErrorKind::Unexpected, "could not find field in 
schema")
-            )?;
-            let target_type = target_field.data_type();
-
-            Ok(if let Some((source_field, source_index)) = 
field_id_to_source_schema_map.get(field_id) {
-                // column present in source
+        projected_iceberg_field_ids
+            .iter()
+            .map(|field_id| {
+                let (target_field, _) =
+                    field_id_to_mapped_schema_map
+                        .get(field_id)
+                        .ok_or(Error::new(
+                            ErrorKind::Unexpected,
+                            "could not find field in schema",
+                        ))?;
+                let target_type = target_field.data_type();
 
-                if source_field.data_type().equals_datatype(target_type) {
-                    // no promotion required
-                    ColumnSource::PassThrough {
-                        source_index: *source_index
+                let iceberg_field = 
snapshot_schema.field_by_id(*field_id).ok_or(Error::new(
+                    ErrorKind::Unexpected,
+                    "Field not found in snapshot schema",
+                ))?;
+
+                // Iceberg spec's "Column Projection" rules 
(https://iceberg.apache.org/spec/#column-projection).
+                // For fields "not present" in data files:
+                // 1. Use partition metadata (identity transforms only)
+                // 2. Use name mapping
+                // 3. Use initial_default
+                // 4. Return null
+                //
+                // Why check partition constants before Parquet field IDs 
(Java: BaseParquetReaders.java:299):
+                // In add_files scenarios, partition columns may exist in BOTH 
Parquet AND partition metadata.
+                // Partition metadata is authoritative - it defines which 
partition this file belongs to.
+
+                // Field ID resolution now happens in ArrowReader via:
+                // 1. Embedded field IDs (ParquetSchemaUtil.hasIds() = true) - 
trust them
+                // 2. Name mapping (ParquetSchemaUtil.applyNameMapping()) - 
applied upfront
+                // 3. Position-based fallback 
(ParquetSchemaUtil.addFallbackIds()) - applied upfront
+                //
+                // At this point, all field IDs in the source schema are 
trustworthy.
+                // No conflict detection needed - schema resolution happened 
in reader.rs.
+                let field_by_id = 
field_id_to_source_schema_map.get(field_id).map(
+                    |(source_field, source_index)| {
+                        if 
source_field.data_type().equals_datatype(target_type) {
+                            ColumnSource::PassThrough {
+                                source_index: *source_index,
+                            }
+                        } else {
+                            ColumnSource::Promote {
+                                target_type: target_type.clone(),
+                                source_index: *source_index,
+                            }
+                        }
+                    },
+                );
+
+                // Apply spec's fallback steps for "not present" fields.
+                let column_source = if let Some(constant_value) = 
constants_map.get(field_id) {
+                    // Rule #1: Identity partition constant
+                    ColumnSource::Add {
+                        value: Some(constant_value.clone()),
+                        target_type: target_type.clone(),
                     }
+                } else if let Some(source) = field_by_id {
+                    source
                 } else {
-                    // promotion required
-                    ColumnSource::Promote {
+                    // Rules #2, #3 and #4:
+                    // Rule #2 (name mapping) was already applied in reader.rs 
if needed.
+                    // If field_id is still not found, the column doesn't 
exist in the Parquet file.
+                    // Fall through to rule #3 (initial_default) or rule #4 
(null).
+                    let default_value = 
iceberg_field.initial_default.as_ref().and_then(|lit| {
+                        if let Literal::Primitive(prim) = lit {
+                            Some(prim.clone())
+                        } else {
+                            None
+                        }
+                    });
+                    ColumnSource::Add {
+                        value: default_value,
                         target_type: target_type.clone(),
-                        source_index: *source_index,
                     }
-                }
-            } else {
-                // column must be added
-                let iceberg_field = 
snapshot_schema.field_by_id(*field_id).ok_or(
-                    Error::new(ErrorKind::Unexpected, "Field not found in 
snapshot schema")
-                )?;
-
-                let default_value = if let Some(iceberg_default_value) =
-                    &iceberg_field.initial_default
-                {
-                    let Literal::Primitive(primitive_literal) = 
iceberg_default_value else {
-                        return Err(Error::new(
-                            ErrorKind::Unexpected,
-                            format!("Default value for column must be 
primitive type, but encountered {iceberg_default_value:?}")
-                        ));
-                    };
-                    Some(primitive_literal.clone())
-                } else {
-                    None
                 };
 
-                ColumnSource::Add {
-                    value: default_value,
-                    target_type: target_type.clone(),
-                }
+                Ok(column_source)
             })
-        }).collect()
+            .collect()
     }
 
     fn build_field_id_to_arrow_schema_map(
@@ -328,25 +485,19 @@ impl RecordBatchTransformer {
     ) -> Result<HashMap<i32, (FieldRef, usize)>> {
         let mut field_id_to_source_schema = HashMap::new();
         for (source_field_idx, source_field) in 
source_schema.fields.iter().enumerate() {
-            let this_field_id = source_field
-                .metadata()
-                .get(PARQUET_FIELD_ID_META_KEY)
-                .ok_or_else(|| {
+            // Check if field has a field ID in metadata
+            if let Some(field_id_str) = 
source_field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
+                let this_field_id = field_id_str.parse().map_err(|e| {
                     Error::new(
                         ErrorKind::DataInvalid,
-                        "field ID not present in parquet metadata",
-                    )
-                })?
-                .parse()
-                .map_err(|e| {
-                    Error::new(
-                        ErrorKind::DataInvalid,
-                        format!("field id not parseable as an i32: {e}"),
+                        format!("field id not parseable as an i32: {}", e),
                     )
                 })?;
 
-            field_id_to_source_schema
-                .insert(this_field_id, (source_field.clone(), 
source_field_idx));
+                field_id_to_source_schema
+                    .insert(this_field_id, (source_field.clone(), 
source_field_idx));
+            }
+            // If field doesn't have a field ID, skip it - name mapping will 
handle it
         }
 
         Ok(field_id_to_source_schema)
@@ -447,7 +598,7 @@ impl RecordBatchTransformer {
             (dt, _) => {
                 return Err(Error::new(
                     ErrorKind::Unexpected,
-                    format!("unexpected target column type {dt}"),
+                    format!("unexpected target column type {}", dt),
                 ));
             }
         })
@@ -466,8 +617,10 @@ mod test {
     use arrow_schema::{DataType, Field, Schema as ArrowSchema};
     use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
 
-    use crate::arrow::record_batch_transformer::RecordBatchTransformer;
-    use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type};
+    use crate::arrow::record_batch_transformer::{
+        RecordBatchTransformer, RecordBatchTransformerBuilder,
+    };
+    use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, 
Type};
 
     #[test]
     fn build_field_id_to_source_schema_map_works() {
@@ -492,7 +645,9 @@ mod test {
         let snapshot_schema = Arc::new(iceberg_table_schema());
         let projected_iceberg_field_ids = [13, 14];
 
-        let mut inst = RecordBatchTransformer::build(snapshot_schema, 
&projected_iceberg_field_ids);
+        let mut inst =
+            RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_iceberg_field_ids)
+                .build();
 
         let result = inst
             .process_record_batch(source_record_batch_no_migration_required())
@@ -508,7 +663,9 @@ mod test {
         let snapshot_schema = Arc::new(iceberg_table_schema());
         let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, 
f
 
-        let mut inst = RecordBatchTransformer::build(snapshot_schema, 
&projected_iceberg_field_ids);
+        let mut inst =
+            RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_iceberg_field_ids)
+                .build();
 
         let result = inst.process_record_batch(source_record_batch()).unwrap();
 
@@ -537,7 +694,8 @@ mod test {
         let projected_iceberg_field_ids = [1, 2, 3];
 
         let mut transformer =
-            RecordBatchTransformer::build(snapshot_schema, 
&projected_iceberg_field_ids);
+            RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_iceberg_field_ids)
+                .build();
 
         let file_schema = Arc::new(ArrowSchema::new(vec![
             simple_field("id", DataType::Int32, false, "1"),
@@ -696,4 +854,592 @@ mod test {
             value.to_string(),
         )]))
     }
+
+    /// Test for add_files with Parquet files that have NO field IDs (Hive 
tables).
+    ///
+    /// This reproduces the scenario from Iceberg spec where:
+    /// - Hive-style partitioned Parquet files are imported via add_files 
procedure
+    /// - Parquet files originally DO NOT have field IDs (typical for Hive 
tables)
+    /// - ArrowReader applies name mapping to assign correct Iceberg field IDs
+    /// - Iceberg schema assigns field IDs: id (1), name (2), dept (3), 
subdept (4)
+    /// - Partition columns (id, dept) have initial_default values
+    ///
+    /// Per the Iceberg spec 
(https://iceberg.apache.org/spec/#column-projection),
+    /// this scenario requires `schema.name-mapping.default` from table 
metadata
+    /// to correctly map Parquet columns by name to Iceberg field IDs.
+    /// This mapping is now applied in ArrowReader before data is processed.
+    ///
+    /// Expected behavior:
+    /// 1. id=1 (from initial_default) - spec rule #3
+    /// 2. name="John Doe" (from Parquet with field_id=2 assigned by reader) - 
found by field ID
+    /// 3. dept="hr" (from initial_default) - spec rule #3
+    /// 4. subdept="communications" (from Parquet with field_id=4 assigned by 
reader) - found by field ID
+    #[test]
+    fn add_files_with_name_mapping_applied_in_reader() {
+        // Iceberg schema after add_files: id (partition), name, dept 
(partition), subdept
+        let snapshot_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(0)
+                .with_fields(vec![
+                    NestedField::optional(1, "id", 
Type::Primitive(PrimitiveType::Int))
+                        .with_initial_default(Literal::int(1))
+                        .into(),
+                    NestedField::optional(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::optional(3, "dept", 
Type::Primitive(PrimitiveType::String))
+                        .with_initial_default(Literal::string("hr"))
+                        .into(),
+                    NestedField::optional(4, "subdept", 
Type::Primitive(PrimitiveType::String))
+                        .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Simulate ArrowReader having applied name mapping:
+        // Original Parquet: name, subdept (NO field IDs)
+        // After reader.rs applies name mapping: name (field_id=2), subdept 
(field_id=4)
+        //
+        // Note: Partition columns (id, dept) are NOT in the Parquet file - 
they're in directory paths
+        use std::collections::HashMap;
+        let parquet_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("name", DataType::Utf8, 
true).with_metadata(HashMap::from([(
+                "PARQUET:field_id".to_string(),
+                "2".to_string(),
+            )])),
+            Field::new("subdept", DataType::Utf8, 
true).with_metadata(HashMap::from([(
+                "PARQUET:field_id".to_string(),
+                "4".to_string(),
+            )])),
+        ]));
+
+        let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept
+
+        let mut transformer =
+            RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids).build();
+
+        // Create a Parquet RecordBatch with data for: name="John Doe", 
subdept="communications"
+        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+            Arc::new(StringArray::from(vec!["John Doe"])),
+            Arc::new(StringArray::from(vec!["communications"])),
+        ])
+        .unwrap();
+
+        let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+        // Verify the transformed RecordBatch has:
+        // - id=1 (from initial_default, not from Parquet)
+        // - name="John Doe" (from Parquet with correct field_id=2)
+        // - dept="hr" (from initial_default, not from Parquet)
+        // - subdept="communications" (from Parquet with correct field_id=4)
+        assert_eq!(result.num_columns(), 4);
+        assert_eq!(result.num_rows(), 1);
+
+        let id_column = result
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_column.value(0), 1);
+
+        let name_column = result
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(name_column.value(0), "John Doe");
+
+        let dept_column = result
+            .column(2)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(dept_column.value(0), "hr");
+
+        let subdept_column = result
+            .column(3)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(subdept_column.value(0), "communications");
+    }
+
+    /// Test for bucket partitioning where source columns must be read from 
data files.
+    ///
+    /// This test verifies correct implementation of the Iceberg spec's 
"Column Projection" rules:
+    /// > "Return the value from partition metadata if an **Identity 
Transform** exists for the field"
+    ///
+    /// # Why this test is critical
+    ///
+    /// The key insight is that partition metadata stores TRANSFORMED values, 
not source values:
+    /// - For `bucket(4, id)`, partition metadata has `id_bucket = 2` (the 
bucket number)
+    /// - The actual `id` column values (100, 200, 300) are ONLY in the data 
file
+    ///
+    /// If iceberg-rust incorrectly treated bucket-partitioned fields as 
constants, it would:
+    /// 1. Replace all `id` values with the constant `2` from partition 
metadata
+    /// 2. Break runtime filtering (e.g., `WHERE id = 100` would match no rows)
+    /// 3. Return incorrect query results
+    ///
+    /// # What this test verifies
+    ///
+    /// - Bucket-partitioned fields (e.g., `bucket(4, id)`) are read from the 
data file
+    /// - The source column `id` contains actual values (100, 200, 300), not 
constants
+    /// - Java's `PartitionUtil.constantsMap()` behavior is correctly 
replicated:
+    ///   ```java
+    ///   if (field.transform().isIdentity()) {  // FALSE for bucket transforms
+    ///       idToConstant.put(field.sourceId(), converted);
+    ///   }
+    ///   ```
+    ///
+    /// # Real-world impact
+    ///
+    /// This reproduces the failure scenario from Iceberg Java's 
TestRuntimeFiltering:
+    /// - Tables partitioned by `bucket(N, col)` are common for load balancing
+    /// - Queries filter on the source column: `SELECT * FROM tbl WHERE col = 
value`
+    /// - Runtime filtering pushes predicates down to Iceberg file scans
+    /// - Without this fix, the filter would match against constant partition 
values instead of data
+    ///
+    /// # References
+    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition 
Transforms"
+    /// - Java impl: 
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
+    #[test]
+    fn bucket_partitioning_reads_source_column_from_file() {
+        use crate::spec::{Struct, Transform};
+
+        // Table schema: id (data column), name (data column), id_bucket 
(partition column)
+        let snapshot_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(0)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Partition spec: bucket(4, id) - the id field is bucketed
+        let partition_spec = Arc::new(
+            crate::spec::PartitionSpec::builder(snapshot_schema.clone())
+                .with_spec_id(0)
+                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
+                .unwrap()
+                .build()
+                .unwrap(),
+        );
+
+        // Partition data: bucket value is 2
+        // In Iceberg, partition data is a Struct where each field corresponds 
to a partition field
+        let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]);
+
+        // Parquet file contains both id and name columns
+        let parquet_schema = Arc::new(ArrowSchema::new(vec![
+            simple_field("id", DataType::Int32, false, "1"),
+            simple_field("name", DataType::Utf8, true, "2"),
+        ]));
+
+        let projected_field_ids = [1, 2]; // id, name
+
+        let mut transformer =
+            RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids)
+                .with_partition(partition_spec, partition_data)
+                .build();
+
+        // Create a Parquet RecordBatch with actual data
+        // The id column MUST be read from here, not treated as a constant
+        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+            Arc::new(Int32Array::from(vec![100, 200, 300])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
+        ])
+        .unwrap();
+
+        let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+        // Verify the transformed RecordBatch correctly reads id from the file
+        // (NOT as a constant from partition metadata)
+        assert_eq!(result.num_columns(), 2);
+        assert_eq!(result.num_rows(), 3);
+
+        let id_column = result
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        // These values MUST come from the Parquet file, not be replaced by 
constants
+        assert_eq!(id_column.value(0), 100);
+        assert_eq!(id_column.value(1), 200);
+        assert_eq!(id_column.value(2), 300);
+
+        let name_column = result
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(name_column.value(0), "Alice");
+        assert_eq!(name_column.value(1), "Bob");
+        assert_eq!(name_column.value(2), "Charlie");
+    }
+
+    /// Test that identity-transformed partition fields ARE treated as 
constants.
+    ///
+    /// This is the complement to 
`bucket_partitioning_reads_source_column_from_file`,
+    /// verifying that constants_map() correctly identifies 
identity-transformed
+    /// partition fields per the Iceberg spec.
+    ///
+    /// # Spec requirement (format/spec.md "Column Projection")
+    ///
+    /// > "Return the value from partition metadata if an Identity Transform 
exists for the field
+    /// >  and the partition value is present in the `partition` struct on 
`data_file` object
+    /// >  in the manifest. This allows for metadata only migrations of Hive 
tables."
+    ///
+    /// # Why identity transforms use constants
+    ///
+    /// Unlike bucket/truncate/year/etc., identity transforms don't modify the 
value:
+    /// - `identity(dept)` stores the actual `dept` value in partition metadata
+    /// - Partition metadata has `dept = "engineering"` (the real value, not a 
hash/bucket)
+    /// - This value can be used directly without reading the data file
+    ///
+    /// # Performance benefit
+    ///
+    /// For Hive migrations where partition columns aren't in data files:
+    /// - Partition metadata provides the column values
+    /// - No need to read from data files (metadata-only query optimization)
+    /// - Common pattern: `dept=engineering/subdept=backend/file.parquet`
+    ///   - `dept` and `subdept` are in directory structure, not in 
`file.parquet`
+    ///   - Iceberg populates these from partition metadata as constants
+    ///
+    /// # What this test verifies
+    ///
+    /// - Identity-partitioned fields use constants from partition metadata
+    /// - The `dept` column is populated with `"engineering"` (not read from 
file)
+    /// - Java's `PartitionUtil.constantsMap()` behavior is matched:
+    ///   ```java
+    ///   if (field.transform().isIdentity()) {  // TRUE for identity
+    ///       idToConstant.put(field.sourceId(), converted);
+    ///   }
+    ///   ```
+    ///
+    /// # References
+    /// - Iceberg spec: format/spec.md "Column Projection"
+    /// - Java impl: 
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+    #[test]
+    fn identity_partition_uses_constant_from_metadata() {
+        use crate::spec::{Struct, Transform};
+
+        // Table schema: id (data column), dept (partition column), name (data 
column)
+        let snapshot_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(0)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "dept", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::optional(3, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Partition spec: identity(dept) - the dept field uses identity 
transform
+        let partition_spec = Arc::new(
+            crate::spec::PartitionSpec::builder(snapshot_schema.clone())
+                .with_spec_id(0)
+                .add_partition_field("dept", "dept", Transform::Identity)
+                .unwrap()
+                .build()
+                .unwrap(),
+        );
+
+        // Partition data: dept="engineering"
+        let partition_data = 
Struct::from_iter(vec![Some(Literal::string("engineering"))]);
+
+        // Parquet file contains only id and name (dept is in partition path)
+        let parquet_schema = Arc::new(ArrowSchema::new(vec![
+            simple_field("id", DataType::Int32, false, "1"),
+            simple_field("name", DataType::Utf8, true, "3"),
+        ]));
+
+        let projected_field_ids = [1, 2, 3]; // id, dept, name
+
+        let mut transformer =
+            RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids)
+                .with_partition(partition_spec, partition_data)
+                .build();
+
+        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+            Arc::new(Int32Array::from(vec![100, 200])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob"])),
+        ])
+        .unwrap();
+
+        let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+        // Verify the dept column is populated with the constant from 
partition metadata
+        assert_eq!(result.num_columns(), 3);
+        assert_eq!(result.num_rows(), 2);
+
+        let id_column = result
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_column.value(0), 100);
+        assert_eq!(id_column.value(1), 200);
+
+        let dept_column = result
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        // This value MUST come from partition metadata (constant)
+        assert_eq!(dept_column.value(0), "engineering");
+        assert_eq!(dept_column.value(1), "engineering");
+
+        let name_column = result
+            .column(2)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(name_column.value(0), "Alice");
+        assert_eq!(name_column.value(1), "Bob");
+    }
+
+    /// Test bucket partitioning with renamed source column.
+    ///
+    /// This verifies correct behavior for 
TestRuntimeFiltering.testRenamedSourceColumnTable() in Iceberg Java.
+    /// When a source column is renamed after partitioning is established, 
field-ID-based mapping
+    /// must still correctly identify the column in Parquet files.
+    ///
+    /// # Scenario
+    ///
+    /// 1. Table created with `bucket(4, id)` partitioning
+    /// 2. Data written to Parquet files (field_id=1, name="id")
+    /// 3. Column renamed: `ALTER TABLE ... RENAME COLUMN id TO row_id`
+    /// 4. Iceberg schema now has: field_id=1, name="row_id"
+    /// 5. Parquet files still have: field_id=1, name="id"
+    ///
+    /// # Expected Behavior Per Iceberg Spec
+    ///
+    /// Per the Iceberg spec "Column Projection" section and Java's 
PartitionUtil.constantsMap():
+    /// - Bucket transforms are NON-identity, so partition metadata stores 
bucket numbers (0-3), not source values
+    /// - Source columns for non-identity transforms MUST be read from data 
files
+    /// - Field-ID-based mapping should find the column by field_id=1 
(ignoring name mismatch)
+    /// - Runtime filtering on `row_id` should work correctly
+    ///
+    /// # What This Tests
+    ///
+    /// This test ensures that when FileScanTask provides partition_spec and 
partition_data:
+    /// - constants_map() correctly identifies that bucket(4, row_id) is NOT 
an identity transform
+    /// - The source column (field_id=1) is NOT added to constants_map
+    /// - Field-ID-based mapping reads actual values from the Parquet file
+    /// - Values [100, 200, 300] are read, not replaced with bucket constant 2
+    ///
+    /// # References
+    /// - Java test: 
spark/src/test/java/.../TestRuntimeFiltering.java::testRenamedSourceColumnTable
+    /// - Java impl: 
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java::constantsMap()
+    /// - Iceberg spec: format/spec.md "Column Projection" section
+    #[test]
+    fn test_bucket_partitioning_with_renamed_source_column() {
+        use crate::spec::{Struct, Transform};
+
+        // Iceberg schema after rename: row_id (was id), name
+        let snapshot_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(0)
+                .with_fields(vec![
+                    NestedField::required(1, "row_id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Partition spec: bucket(4, row_id) - but source_id still points to 
field_id=1
+        let partition_spec = Arc::new(
+            crate::spec::PartitionSpec::builder(snapshot_schema.clone())
+                .with_spec_id(0)
+                .add_partition_field("row_id", "row_id_bucket", 
Transform::Bucket(4))
+                .unwrap()
+                .build()
+                .unwrap(),
+        );
+
+        // Partition data: bucket value is 2
+        let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]);
+
+        // Parquet file has OLD column name "id" but SAME field_id=1
+        // Field-ID-based mapping should find this despite name mismatch
+        let parquet_schema = Arc::new(ArrowSchema::new(vec![
+            simple_field("id", DataType::Int32, false, "1"),
+            simple_field("name", DataType::Utf8, true, "2"),
+        ]));
+
+        let projected_field_ids = [1, 2]; // row_id (field_id=1), name 
(field_id=2)
+
+        let mut transformer =
+            RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids)
+                .with_partition(partition_spec, partition_data)
+                .build();
+
+        // Create a Parquet RecordBatch with actual data
+        // Despite column rename, data should be read via field_id=1
+        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+            Arc::new(Int32Array::from(vec![100, 200, 300])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
+        ])
+        .unwrap();
+
+        let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+        // Verify the transformed RecordBatch correctly reads data despite 
name mismatch
+        assert_eq!(result.num_columns(), 2);
+        assert_eq!(result.num_rows(), 3);
+
+        let row_id_column = result
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        // These values MUST come from the Parquet file via field_id=1,
+        // not be replaced by the bucket constant (2)
+        assert_eq!(row_id_column.value(0), 100);
+        assert_eq!(row_id_column.value(1), 200);
+        assert_eq!(row_id_column.value(2), 300);
+
+        let name_column = result
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(name_column.value(0), "Alice");
+        assert_eq!(name_column.value(1), "Bob");
+        assert_eq!(name_column.value(2), "Charlie");
+    }
+
+    /// Comprehensive integration test that verifies all 4 Iceberg spec rules 
work correctly.
+    ///
+    /// Per the Iceberg spec 
(https://iceberg.apache.org/spec/#column-projection),
+    /// "Values for field ids which are not present in a data file must be 
resolved
+    /// according the following rules:"
+    ///
+    /// This test creates a scenario where each rule is exercised:
+    /// - Rule #1: dept (identity-partitioned) -> constant from partition 
metadata
+    /// - Rule #2: data (via name mapping) -> read from Parquet file by name
+    /// - Rule #3: category (initial_default) -> use default value
+    /// - Rule #4: notes (no default) -> return null
+    ///
+    /// # References
+    /// - Iceberg spec: format/spec.md "Column Projection" section
+    #[test]
+    fn test_all_four_spec_rules() {
+        use crate::spec::Transform;
+
+        // Iceberg schema with columns designed to exercise each spec rule
+        let snapshot_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(0)
+                .with_fields(vec![
+                    // Field in Parquet by field ID (normal case)
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    // Rule #1: Identity-partitioned field - should use 
partition metadata
+                    NestedField::required(2, "dept", 
Type::Primitive(PrimitiveType::String)).into(),
+                    // Rule #2: Field resolved by name mapping (ArrowReader 
already applied)
+                    NestedField::required(3, "data", 
Type::Primitive(PrimitiveType::String)).into(),
+                    // Rule #3: Field with initial_default
+                    NestedField::optional(4, "category", 
Type::Primitive(PrimitiveType::String))
+                        
.with_initial_default(Literal::string("default_category"))
+                        .into(),
+                    // Rule #4: Field with no default - should be null
+                    NestedField::optional(5, "notes", 
Type::Primitive(PrimitiveType::String))
+                        .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Partition spec: identity transform on dept
+        let partition_spec = Arc::new(
+            crate::spec::PartitionSpec::builder(snapshot_schema.clone())
+                .with_spec_id(0)
+                .add_partition_field("dept", "dept", Transform::Identity)
+                .unwrap()
+                .build()
+                .unwrap(),
+        );
+
+        // Partition data: dept="engineering"
+        let partition_data = 
Struct::from_iter(vec![Some(Literal::string("engineering"))]);
+
+        // Parquet schema: simulates post-ArrowReader state where name mapping 
already applied
+        // Has id (field_id=1) and data (field_id=3, assigned by ArrowReader 
via name mapping)
+        // Missing: dept (in partition), category (has default), notes (no 
default)
+        let parquet_schema = Arc::new(ArrowSchema::new(vec![
+            simple_field("id", DataType::Int32, false, "1"),
+            simple_field("data", DataType::Utf8, false, "3"),
+        ]));
+
+        let projected_field_ids = [1, 2, 3, 4, 5]; // id, dept, data, 
category, notes
+
+        let mut transformer =
+            RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids)
+                .with_partition(partition_spec, partition_data)
+                .build();
+
+        let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+            Arc::new(Int32Array::from(vec![100, 200])),
+            Arc::new(StringArray::from(vec!["value1", "value2"])),
+        ])
+        .unwrap();
+
+        let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+        assert_eq!(result.num_columns(), 5);
+        assert_eq!(result.num_rows(), 2);
+
+        // Verify each column demonstrates the correct spec rule:
+
+        // Normal case: id from Parquet by field ID
+        let id_column = result
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_column.value(0), 100);
+        assert_eq!(id_column.value(1), 200);
+
+        // Rule #1: dept from partition metadata (identity transform)
+        let dept_column = result
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(dept_column.value(0), "engineering");
+        assert_eq!(dept_column.value(1), "engineering");
+
+        // Rule #2: data from Parquet via name mapping
+        let data_column = result
+            .column(2)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(data_column.value(0), "value1");
+        assert_eq!(data_column.value(1), "value2");
+
+        // Rule #3: category from initial_default
+        let category_column = result
+            .column(3)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(category_column.value(0), "default_category");
+        assert_eq!(category_column.value(1), "default_category");
+
+        // Rule #4: notes is null (no default, not in Parquet, not in 
partition)
+        let notes_column = result
+            .column(4)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert!(notes_column.is_null(0));
+        assert!(notes_column.is_null(1));
+    }
 }
diff --git a/crates/iceberg/src/scan/context.rs 
b/crates/iceberg/src/scan/context.rs
index 3f7c29db..fe3f5c8f 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -128,6 +128,13 @@ impl ManifestEntryContext {
                 .map(|x| x.as_ref().snapshot_bound_predicate.clone()),
 
             deletes,
+
+            // Include partition data and spec from manifest entry
+            partition: Some(self.manifest_entry.data_file.partition.clone()),
+            // TODO: Pass actual PartitionSpec through context chain for 
native flow
+            partition_spec: None,
+            // TODO: Extract name_mapping from table metadata property 
"schema.name-mapping.default"
+            name_mapping: None,
         })
     }
 }
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 6884e00b..3e319ca0 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -1777,6 +1777,9 @@ pub mod tests {
             record_count: Some(100),
             data_file_format: DataFileFormat::Parquet,
             deletes: vec![],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
         };
         test_fn(task);
 
@@ -1791,6 +1794,9 @@ pub mod tests {
             record_count: None,
             data_file_format: DataFileFormat::Avro,
             deletes: vec![],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
         };
         test_fn(task);
     }
diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs
index 32fe3ae3..e1ef241a 100644
--- a/crates/iceberg/src/scan/task.rs
+++ b/crates/iceberg/src/scan/task.rs
@@ -15,16 +15,39 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use futures::stream::BoxStream;
-use serde::{Deserialize, Serialize};
+use serde::{Deserialize, Serialize, Serializer};
 
 use crate::Result;
 use crate::expr::BoundPredicate;
-use crate::spec::{DataContentType, DataFileFormat, ManifestEntryRef, Schema, 
SchemaRef};
+use crate::spec::{
+    DataContentType, DataFileFormat, ManifestEntryRef, NameMapping, 
PartitionSpec, Schema,
+    SchemaRef, Struct,
+};
 
 /// A stream of [`FileScanTask`].
 pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
 
+/// Serialization helper that always returns NotImplementedError.
+/// Used for fields that should not be serialized but we want to be explicit 
about it.
+fn serialize_not_implemented<S, T>(_: &T, _: S) -> std::result::Result<S::Ok, 
S::Error>
+where S: Serializer {
+    Err(serde::ser::Error::custom(
+        "Serialization not implemented for this field",
+    ))
+}
+
+/// Deserialization helper that always returns NotImplementedError.
+/// Used for fields that should not be deserialized but we want to be explicit 
about it.
+fn deserialize_not_implemented<'de, D, T>(_: D) -> std::result::Result<T, 
D::Error>
+where D: serde::Deserializer<'de> {
+    Err(serde::de::Error::custom(
+        "Deserialization not implemented for this field",
+    ))
+}
+
 /// A task to scan part of file.
 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
 pub struct FileScanTask {
@@ -54,6 +77,33 @@ pub struct FileScanTask {
 
     /// The list of delete files that may need to be applied to this data file
     pub deletes: Vec<FileScanTaskDeleteFile>,
+
+    /// Partition data from the manifest entry, used to identify which columns 
can use
+    /// constant values from partition metadata vs. reading from the data file.
+    /// Per the Iceberg spec, only identity-transformed partition fields 
should use constants.
+    #[serde(default)]
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(serialize_with = "serialize_not_implemented")]
+    #[serde(deserialize_with = "deserialize_not_implemented")]
+    pub partition: Option<Struct>,
+
+    /// The partition spec for this file, used to distinguish identity 
transforms
+    /// (which use partition metadata constants) from non-identity transforms 
like
+    /// bucket/truncate (which must read source columns from the data file).
+    #[serde(default)]
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(serialize_with = "serialize_not_implemented")]
+    #[serde(deserialize_with = "deserialize_not_implemented")]
+    pub partition_spec: Option<Arc<PartitionSpec>>,
+
+    /// Name mapping from table metadata (property: 
schema.name-mapping.default),
+    /// used to resolve field IDs from column names when Parquet files lack 
field IDs
+    /// or have field ID conflicts.
+    #[serde(default)]
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(serialize_with = "serialize_not_implemented")]
+    #[serde(deserialize_with = "deserialize_not_implemented")]
+    pub name_mapping: Option<Arc<NameMapping>>,
 }
 
 impl FileScanTask {

Reply via email to