This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ed913bc Read partitioned tables with source field missing (#2367)
8ed913bc is described below

commit 8ed913bc48b19460c47746273b4678e7d2e6af70
Author: Gabriel Igliozzi <[email protected]>
AuthorDate: Thu Dec 4 16:53:56 2025 +0100

    Read partitioned tables with source field missing (#2367)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    <!-- Closes #${GITHUB_ISSUE_ID} -->
    
    # Rationale for this change
    
    Following with the Java
    [solution](https://github.com/apache/iceberg/pull/11868/files)
    implementation on how to read partition specs when a source field was
    dropped.
    
    # Are these changes tested?
    
    Yes, added one integration tests, and one unit test
    
    # Are there any user-facing changes?
    
    No
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
---
 pyiceberg/partitioning.py        | 12 ++++++++----
 tests/integration/test_reads.py  | 16 ++++++++++++++++
 tests/table/test_partitioning.py | 36 ++++++++++++++++++++++++++++++++++++
 3 files changed, 60 insertions(+), 4 deletions(-)

diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py
index 45d0dfd2..8bf2b817 100644
--- a/pyiceberg/partitioning.py
+++ b/pyiceberg/partitioning.py
@@ -56,6 +56,7 @@ from pyiceberg.types import (
     TimestampType,
     TimestamptzType,
     TimeType,
+    UnknownType,
     UUIDType,
 )
 from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, 
time_to_micros
@@ -222,11 +223,14 @@ class PartitionSpec(IcebergBaseModel):
         :return: A StructType that represents the PartitionSpec, with a 
NestedField for each PartitionField.
         """
         nested_fields = []
+        schema_ids = schema._lazy_id_to_field
         for field in self.fields:
-            source_type = schema.find_type(field.source_id)
-            result_type = field.transform.result_type(source_type)
-            required = schema.find_field(field.source_id).required
-            nested_fields.append(NestedField(field.field_id, field.name, 
result_type, required=required))
+            if source_field := schema_ids.get(field.source_id):
+                result_type = 
field.transform.result_type(source_field.field_type)
+                nested_fields.append(NestedField(field.field_id, field.name, 
result_type, required=source_field.required))
+            else:
+                # Since the source field has been drop we cannot determine the 
type
+                nested_fields.append(NestedField(field.field_id, field.name, 
UnknownType()))
         return StructType(*nested_fields)
 
     def partition_to_path(self, data: Record, schema: Schema) -> str:
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index 99116ad1..785037ae 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -1083,3 +1083,19 @@ def test_filter_after_arrow_scan(catalog: Catalog) -> 
None:
 
     scan = scan.filter("ts >= '2023-03-05T00:00:00+00:00'")
     assert len(scan.to_arrow()) > 0
+
+
[email protected]
[email protected]("catalog", [pytest.lazy_fixture("session_catalog")])
+def test_scan_source_field_missing_in_spec(catalog: Catalog, spark: 
SparkSession) -> None:
+    identifier = "default.test_dropped_field"
+    spark.sql(f"DROP TABLE IF EXISTS {identifier}")
+    spark.sql(f"CREATE TABLE {identifier} (foo int, bar int, jaz string) USING 
ICEBERG PARTITIONED BY (foo, bar)")
+    spark.sql(
+        f"INSERT INTO {identifier} (foo, bar, jaz) VALUES (1, 1, 'dummy 
data'), (1, 2, 'dummy data again'), (2, 1, 'another partition')"
+    )
+    spark.sql(f"ALTER TABLE {identifier} DROP PARTITION FIELD foo")
+    spark.sql(f"ALTER TABLE {identifier} DROP COLUMN  foo")
+
+    table = catalog.load_table(identifier)
+    assert len(list(table.scan().plan_files())) == 3
diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py
index 0fe22391..576297c6 100644
--- a/tests/table/test_partitioning.py
+++ b/tests/table/test_partitioning.py
@@ -47,6 +47,7 @@ from pyiceberg.types import (
     TimestampType,
     TimestamptzType,
     TimeType,
+    UnknownType,
     UUIDType,
 )
 
@@ -165,6 +166,28 @@ def test_partition_spec_to_path() -> None:
     assert spec.partition_to_path(record, schema) == 
"my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10"
 
 
+def test_partition_spec_to_path_dropped_source_id() -> None:
+    schema = Schema(
+        NestedField(field_id=1, name="str", field_type=StringType(), 
required=False),
+        NestedField(field_id=2, name="other_str", field_type=StringType(), 
required=False),
+        NestedField(field_id=3, name="int", field_type=IntegerType(), 
required=True),
+    )
+
+    spec = PartitionSpec(
+        PartitionField(source_id=1, field_id=1000, 
transform=TruncateTransform(width=19), name="my#str%bucket"),
+        PartitionField(source_id=2, field_id=1001, 
transform=IdentityTransform(), name="other str+bucket"),
+        # Point partition field to missing source id
+        PartitionField(source_id=4, field_id=1002, 
transform=BucketTransform(num_buckets=25), name="my!int:bucket"),
+        spec_id=3,
+    )
+
+    record = Record("my+str", "( )", 10)
+
+    # Both partition field names and values should be URL encoded, with spaces 
mapping to plus signs, to match the Java
+    # behaviour: 
https://github.com/apache/iceberg/blob/ca3db931b0f024f0412084751ac85dd4ef2da7e7/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L198-L204
+    assert spec.partition_to_path(record, schema) == 
"my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10"
+
+
 def test_partition_type(table_schema_simple: Schema) -> None:
     spec = PartitionSpec(
         PartitionField(source_id=1, field_id=1000, 
transform=TruncateTransform(width=19), name="str_truncate"),
@@ -178,6 +201,19 @@ def test_partition_type(table_schema_simple: Schema) -> 
None:
     )
 
 
+def test_partition_type_missing_source_field(table_schema_simple: Schema) -> 
None:
+    spec = PartitionSpec(
+        PartitionField(source_id=1, field_id=1000, 
transform=TruncateTransform(width=19), name="str_truncate"),
+        PartitionField(source_id=10, field_id=1001, 
transform=BucketTransform(num_buckets=25), name="int_bucket"),
+        spec_id=3,
+    )
+
+    assert spec.partition_type(table_schema_simple) == StructType(
+        NestedField(field_id=1000, name="str_truncate", 
field_type=StringType(), required=False),
+        NestedField(field_id=1001, name="int_bucket", 
field_type=UnknownType(), required=False),
+    )
+
+
 @pytest.mark.parametrize(
     "source_type, value",
     [

Reply via email to