This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d549a9e Fix partition column projection with schema evolution (#2685)
2d549a9e is described below

commit 2d549a9e67b44901c1677d519276372de9fa1d6c
Author: Kevin Jiao <[email protected]>
AuthorDate: Mon Nov 3 14:59:49 2025 -0800

    Fix partition column projection with schema evolution (#2685)
    
    Closes #2672
    
      # Rationale for this change
    
    When performing column projection on partitioned tables with schema
    evolution, PyIceberg incorrectly uses the projected schema (containing
    only selected columns) instead
    of the full table schema when building partition types in
    `_get_column_projection_values()`. This causes `ValueError: Could not
    find field with id: X` when:
    
      1. Reading from partitioned Iceberg tables
    2. Using column projection (selecting specific columns, not `SELECT *`)
      3. Selected columns do NOT include the partition field(s)
    4. The table has undergone schema evolution (fields added/removed after
    initial creation)
    5. Reading files that are missing some of the selected columns (written
    before schema evolution)
    
    The root cause is where
    `partition_spec.partition_type(projected_schema)` fails because the
    projected schema may be missing fields that
      exist in the partition specification.
    
    The fix passes the full table schema from
    `ArrowScan._table_metadata.schema()` through `_task_to_record_batches()`
    to `_get_column_projection_values()`, ensuring all fields are available
    when building partition accessors.
    
      ## Are these changes tested?
    
    Yes. Added a test
    `test_partition_column_projection_with_schema_evolution` that:
      - Creates a partitioned table with initial schema
      - Writes data with the initial schema
      - Evolves the schema by adding a new column
      - Writes data with the evolved schema
      - Performs column projection that excludes the partition field
    
      ## Are there any user-facing changes?
    
      No. Only internal helpers are changed
---
 pyiceberg/io/pyarrow.py  | 12 ++++++---
 tests/io/test_pyarrow.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 79 insertions(+), 3 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 7710df76..d1484a83 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1492,14 +1492,18 @@ class _ConvertToIcebergWithoutIDs(_ConvertToIceberg):
 
 
 def _get_column_projection_values(
-    file: DataFile, projected_schema: Schema, partition_spec: 
Optional[PartitionSpec], file_project_field_ids: Set[int]
+    file: DataFile,
+    projected_schema: Schema,
+    table_schema: Schema,
+    partition_spec: Optional[PartitionSpec],
+    file_project_field_ids: Set[int],
 ) -> Dict[int, Any]:
     """Apply Column Projection rules to File Schema."""
     project_schema_diff = 
projected_schema.field_ids.difference(file_project_field_ids)
     if len(project_schema_diff) == 0 or partition_spec is None:
         return EMPTY_DICT
 
-    partition_schema = partition_spec.partition_type(projected_schema)
+    partition_schema = partition_spec.partition_type(table_schema)
     accessors = build_position_accessors(partition_schema)
 
     projected_missing_fields = {}
@@ -1517,6 +1521,7 @@ def _task_to_record_batches(
     task: FileScanTask,
     bound_row_filter: BooleanExpression,
     projected_schema: Schema,
+    table_schema: Schema,
     projected_field_ids: Set[int],
     positional_deletes: Optional[List[ChunkedArray]],
     case_sensitive: bool,
@@ -1541,7 +1546,7 @@ def _task_to_record_batches(
 
         # Apply column projection rules: 
https://iceberg.apache.org/spec/#column-projection
         projected_missing_fields = _get_column_projection_values(
-            task.file, projected_schema, partition_spec, file_schema.field_ids
+            task.file, projected_schema, table_schema, partition_spec, 
file_schema.field_ids
         )
 
         pyarrow_filter = None
@@ -1763,6 +1768,7 @@ class ArrowScan:
                 task,
                 self._bound_row_filter,
                 self._projected_schema,
+                self._table_metadata.schema(),
                 self._projected_field_ids,
                 deletes_per_file.get(task.file.file_path),
                 self._case_sensitive,
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 45b9d9c9..cd3f9c80 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -2846,6 +2846,7 @@ def test_task_to_record_batches_nanos(format_version: 
TableVersion, tmpdir: str)
             FileScanTask(data_file),
             bound_row_filter=AlwaysTrue(),
             projected_schema=table_schema,
+            table_schema=table_schema,
             projected_field_ids={1},
             positional_deletes=None,
             case_sensitive=True,
@@ -4590,3 +4591,72 @@ def test_orc_stripe_based_batching(tmp_path: Path) -> 
None:
         # Verify total rows
         total_rows = sum(batch.num_rows for batch in batches)
         assert total_rows == 10000, f"Expected 10000 total rows, got 
{total_rows}"
+
+
+def test_partition_column_projection_with_schema_evolution(catalog: 
InMemoryCatalog) -> None:
+    """Test column projection on partitioned table after schema evolution 
(https://github.com/apache/iceberg-python/issues/2672)."""
+    initial_schema = Schema(
+        NestedField(1, "partition_date", DateType(), required=False),
+        NestedField(2, "id", IntegerType(), required=False),
+        NestedField(3, "name", StringType(), required=False),
+        NestedField(4, "value", IntegerType(), required=False),
+    )
+
+    partition_spec = PartitionSpec(
+        PartitionField(source_id=1, field_id=1000, 
transform=IdentityTransform(), name="partition_date"),
+    )
+
+    catalog.create_namespace("default")
+    table = catalog.create_table(
+        "default.test_schema_evolution_projection",
+        schema=initial_schema,
+        partition_spec=partition_spec,
+    )
+
+    data_v1 = pa.Table.from_pylist(
+        [
+            {"partition_date": date(2024, 1, 1), "id": 1, "name": "Alice", 
"value": 100},
+            {"partition_date": date(2024, 1, 1), "id": 2, "name": "Bob", 
"value": 200},
+        ],
+        schema=pa.schema(
+            [
+                ("partition_date", pa.date32()),
+                ("id", pa.int32()),
+                ("name", pa.string()),
+                ("value", pa.int32()),
+            ]
+        ),
+    )
+
+    table.append(data_v1)
+
+    with table.update_schema() as update:
+        update.add_column("new_column", StringType())
+
+    table = catalog.load_table("default.test_schema_evolution_projection")
+
+    data_v2 = pa.Table.from_pylist(
+        [
+            {"partition_date": date(2024, 1, 2), "id": 3, "name": "Charlie", 
"value": 300, "new_column": "new1"},
+            {"partition_date": date(2024, 1, 2), "id": 4, "name": "David", 
"value": 400, "new_column": "new2"},
+        ],
+        schema=pa.schema(
+            [
+                ("partition_date", pa.date32()),
+                ("id", pa.int32()),
+                ("name", pa.string()),
+                ("value", pa.int32()),
+                ("new_column", pa.string()),
+            ]
+        ),
+    )
+
+    table.append(data_v2)
+
+    result = table.scan(selected_fields=("id", "name", "value", 
"new_column")).to_arrow()
+
+    assert set(result.schema.names) == {"id", "name", "value", "new_column"}
+    assert result.num_rows == 4
+    result_sorted = result.sort_by("name")
+    assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", 
"David"]
+    assert result_sorted["new_column"].to_pylist() == [None, None, "new1", 
"new2"]

Reply via email to