This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 454c17f5 Remove `field-id` constraint on `add_files` (#2662)
454c17f5 is described below

commit 454c17f56f0f4c117c62f298bfade38f2ff67b6f
Author: jeroko <[email protected]>
AuthorDate: Mon Dec 8 11:18:37 2025 +0100

    Remove `field-id` constraint on `add_files` (#2662)
    
    # Rationale for this change
    Closes #2131
    
    The PR relaxes the constraint that prevented adding any file with field
    IDs, and replaces it with a constraint that prevents adding files which
    contain field IDs that are inconsistent with the field IDs of the table.
    If the field IDs are compatible, then they can be added safely, if not,
    they will be rejected.
    
    ## Are these changes tested?
    Yes
    
    ## Are there any user-facing changes?
    Yes
---
 mkdocs/docs/api.md                  |  6 ++-
 pyiceberg/io/pyarrow.py             |  5 +-
 tests/integration/test_add_files.py | 93 ++++++++++++++++++++++++++++++++++---
 3 files changed, 92 insertions(+), 12 deletions(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index c36c24bc..5d43a940 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -1006,8 +1006,10 @@ Expert Iceberg users may choose to commit existing 
parquet files to the Iceberg
 
 <!-- prettier-ignore-start -->
 
-!!! note "Name Mapping"
-    Because `add_files` uses existing files without writing new parquet files 
that are aware of the Iceberg's schema, it requires the Iceberg's table to have 
a [Name 
Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization)
 (The Name mapping maps the field names within the parquet files to the Iceberg 
field IDs). Hence, `add_files` requires that there are no field IDs in the 
parquet file's metadata, and creates a new Name Mapping based on the table's 
current sc [...]
+!!! note "Name Mapping and Field IDs"
+    `add_files` can work with Parquet files both with and without field IDs in 
their metadata:
+    - **Files with field IDs**: When field IDs are present in the Parquet 
metadata, they must match the corresponding field IDs in the Iceberg table 
schema. This is common for files generated by tools like Spark or when using or 
other libraries with explicit field ID metadata.
+    - **Files without field IDs**: When field IDs are absent, the table must 
have a [Name 
Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization)
 to map field names to Iceberg field IDs. `add_files` will automatically create 
a Name Mapping based on the table's current schema if one doesn't already exist.
 
 !!! note "Partitions"
     `add_files` only requires the client to read the existing parquet files' 
metadata footer to infer the partition value of each file. This implementation 
also supports adding files to Iceberg tables with partition transforms like 
`MonthTransform`, and `TruncateTransform` which preserve the order of the 
values after the transformation (Any Transform that has the `preserves_order` 
property set to True is supported). Please note that if the column statistics 
of the `PartitionField`'s sour [...]
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index d98e3fa7..1077f41f 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -2708,6 +2708,7 @@ def _check_pyarrow_schema_compatible(
         ValueError: If the schemas are not compatible.
     """
     name_mapping = requested_schema.name_mapping
+
     try:
         provided_schema = pyarrow_to_schema(
             provided_schema,
@@ -2738,10 +2739,6 @@ def parquet_file_to_data_file(io: FileIO, 
table_metadata: TableMetadata, file_pa
         parquet_metadata = pq.read_metadata(input_stream)
 
     arrow_schema = parquet_metadata.schema.to_arrow_schema()
-    if visit_pyarrow(arrow_schema, _HasIds()):
-        raise NotImplementedError(
-            f"Cannot add file {file_path} because it has field IDs. 
`add_files` only supports addition of files without field_ids"
-        )
 
     schema = table_metadata.schema()
     _check_pyarrow_schema_compatible(schema, arrow_schema, 
format_version=table_metadata.format_version)
diff --git a/tests/integration/test_add_files.py 
b/tests/integration/test_add_files.py
index e78d4dfb..86ef05e5 100644
--- a/tests/integration/test_add_files.py
+++ b/tests/integration/test_add_files.py
@@ -216,14 +216,14 @@ def 
test_add_files_to_unpartitioned_table_raises_file_not_found(
 
 
 @pytest.mark.integration
-def test_add_files_to_unpartitioned_table_raises_has_field_ids(
+def test_add_files_to_unpartitioned_table_with_field_ids(
     spark: SparkSession, session_catalog: Catalog, format_version: int
 ) -> None:
-    identifier = f"default.unpartitioned_raises_field_ids_v{format_version}"
+    identifier = f"default.unpartitioned_with_field_ids_v{format_version}"
     tbl = _create_table(session_catalog, identifier, format_version)
 
-    file_paths = 
[f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet"
 for i in range(5)]
-    # write parquet files
+    file_paths = 
[f"s3://warehouse/default/unpartitioned_with_field_ids/v{format_version}/test-{i}.parquet"
 for i in range(5)]
+    # write parquet files with field IDs matching the table schema
     for file_path in file_paths:
         fo = tbl.io.new_output(file_path)
         with fo.create(overwrite=True) as fos:
@@ -231,8 +231,30 @@ def 
test_add_files_to_unpartitioned_table_raises_has_field_ids(
                 writer.write_table(ARROW_TABLE_WITH_IDS)
 
     # add the parquet files as data files
-    with pytest.raises(NotImplementedError):
-        tbl.add_files(file_paths=file_paths)
+    tbl.add_files(file_paths=file_paths)
+
+    # NameMapping should still be set even though files have field IDs
+    assert tbl.name_mapping() is not None
+
+    # Verify files were added successfully
+    rows = spark.sql(
+        f"""
+        SELECT added_data_files_count, existing_data_files_count, 
deleted_data_files_count
+        FROM {identifier}.all_manifests
+    """
+    ).collect()
+
+    assert [row.added_data_files_count for row in rows] == [5]
+    assert [row.existing_data_files_count for row in rows] == [0]
+    assert [row.deleted_data_files_count for row in rows] == [0]
+
+    # Verify data can be read back correctly
+    df = spark.table(identifier).toPandas()
+    assert len(df) == 5
+    assert all(df["foo"] == True)  # noqa: E712
+    assert all(df["bar"] == "bar_string")
+    assert all(df["baz"] == 123)
+    assert all(df["qux"] == date(2024, 3, 7))
 
 
 @pytest.mark.integration
@@ -579,6 +601,65 @@ def test_add_files_fails_on_schema_mismatch(spark: 
SparkSession, session_catalog
         tbl.add_files(file_paths=[file_path])
 
 
[email protected]
+def test_add_files_with_field_ids_fails_on_schema_mismatch(
+    spark: SparkSession, session_catalog: Catalog, format_version: int
+) -> None:
+    """Test that files with mismatched field types (when field IDs match) are 
rejected."""
+    identifier = 
f"default.table_schema_mismatch_based_on_field_ids__fails_v{format_version}"
+
+    tbl = _create_table(session_catalog, identifier, format_version)
+
+    # All fields are renamed and reordered but have matching field IDs, so 
they should be compatible
+    # except for 'baz' which has the wrong type
+    WRONG_SCHEMA = pa.schema(
+        [
+            pa.field("qux_", pa.date32(), metadata={"PARQUET:field_id": "4"}),
+            pa.field("baz_", pa.string(), metadata={"PARQUET:field_id": "3"}), 
 # Wrong type: should be int32
+            pa.field("bar_", pa.string(), metadata={"PARQUET:field_id": "2"}),
+            pa.field("foo_", pa.bool_(), metadata={"PARQUET:field_id": "1"}),
+        ]
+    )
+    file_path = 
f"s3://warehouse/default/table_with_field_ids_schema_mismatch_fails/v{format_version}/test.parquet"
+    # write parquet files
+    fo = tbl.io.new_output(file_path)
+    with fo.create(overwrite=True) as fos:
+        with pq.ParquetWriter(fos, schema=WRONG_SCHEMA) as writer:
+            writer.write_table(
+                pa.Table.from_pylist(
+                    [
+                        {
+                            "qux_": date(2024, 3, 7),
+                            "baz_": "123",
+                            "bar_": "bar_string",
+                            "foo_": True,
+                        },
+                        {
+                            "qux_": date(2024, 3, 7),
+                            "baz_": "124",
+                            "bar_": "bar_string",
+                            "foo_": True,
+                        },
+                    ],
+                    schema=WRONG_SCHEMA,
+                )
+            )
+
+    expected = """Mismatch in fields:
+┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
+┃    ┃ Table field              ┃ Dataframe field           ┃
+┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
+│ ✅ │ 1: foo: optional boolean │ 1: foo_: optional boolean │
+│ ✅ │ 2: bar: optional string  │ 2: bar_: optional string  │
+│ ❌ │ 3: baz: optional int     │ 3: baz_: optional string  │
+│ ✅ │ 4: qux: optional date    │ 4: qux_: optional date    │
+└────┴──────────────────────────┴───────────────────────────┘
+"""
+
+    with pytest.raises(ValueError, match=expected):
+        tbl.add_files(file_paths=[file_path])
+
+
 @pytest.mark.integration
 def test_add_files_with_large_and_regular_schema(spark: SparkSession, 
session_catalog: Catalog, format_version: int) -> None:
     identifier = f"default.unpartitioned_with_large_types{format_version}"

Reply via email to