This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new b6a45ed7 fix: sanitize invalid Avro field names in manifest file 
(#2245)
b6a45ed7 is described below

commit b6a45ed70676e1f0e832fa820a4d8b4b0bd42d23
Author: Kristofer Gaudel <[email protected]>
AuthorDate: Wed Aug 6 17:12:32 2025 -0400

    fix: sanitize invalid Avro field names in manifest file (#2245)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    <!-- Closes #${GITHUB_ISSUE_ID} -->
    Closes https://github.com/apache/iceberg-python/issues/2123
    
    # Rationale for this change
    Fixing sanitization behaviour to match specification and Java
    implementation
    
    # Are these changes tested?
    Yes - Unit and integration tests
    
    # Are there any user-facing changes?
    Yes - Field names will be sanitized to be Avro compatible if not already
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
    
    ---------
    
    Co-authored-by: Kevin Liu <[email protected]>
---
 pyiceberg/schema.py                          |  22 ++-
 pyiceberg/utils/schema_conversion.py         |  29 ++-
 tests/integration/test_writes/test_writes.py | 131 +++++++++++++
 tests/test_avro_sanitization.py              | 269 +++++++++++++++++++++++++++
 4 files changed, 442 insertions(+), 9 deletions(-)

diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py
index 6333ace6..1eadc583 100644
--- a/pyiceberg/schema.py
+++ b/pyiceberg/schema.py
@@ -78,6 +78,9 @@ P = TypeVar("P")
 
 INITIAL_SCHEMA_ID = 0
 
+FIELD_ID_PROP = "field-id"
+ICEBERG_FIELD_NAME_PROP = "iceberg-field-name"
+
 
 class Schema(IcebergBaseModel):
     """A table Schema.
@@ -1356,6 +1359,21 @@ class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
 
 # Implementation copied from Apache Iceberg repo.
 def make_compatible_name(name: str) -> str:
+    """Make a field name compatible with Avro specification.
+
+    This function sanitizes field names to comply with Avro naming rules:
+    - Names must start with [A-Za-z_]
+    - Subsequent characters must be [A-Za-z0-9_]
+
+    Invalid characters are replaced with _xHHHH where HHHH is the hex code.
+    Names starting with digits get a leading underscore.
+
+    Args:
+        name: The original field name
+
+    Returns:
+        A sanitized name that complies with Avro specification
+    """
     if not _valid_avro_name(name):
         return _sanitize_name(name)
     return name
@@ -1391,7 +1409,9 @@ def _sanitize_name(name: str) -> str:
 
 
 def _sanitize_char(character: str) -> str:
-    return "_" + character if character.isdigit() else "_x" + 
hex(ord(character))[2:].upper()
+    if character.isdigit():
+        return "_" + character
+    return "_x" + hex(ord(character))[2:].upper()
 
 
 def sanitize_column_names(schema: Schema) -> Schema:
diff --git a/pyiceberg/utils/schema_conversion.py 
b/pyiceberg/utils/schema_conversion.py
index 232b8f00..551fa401 100644
--- a/pyiceberg/utils/schema_conversion.py
+++ b/pyiceberg/utils/schema_conversion.py
@@ -26,7 +26,14 @@ from typing import (
     Union,
 )
 
-from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
+from pyiceberg.schema import (
+    FIELD_ID_PROP,
+    ICEBERG_FIELD_NAME_PROP,
+    Schema,
+    SchemaVisitorPerPrimitiveType,
+    make_compatible_name,
+    visit,
+)
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -225,13 +232,13 @@ class AvroSchemaConversion:
         Returns:
             The Iceberg equivalent field.
         """
-        if "field-id" not in field:
-            raise ValueError(f"Cannot convert field, missing field-id: 
{field}")
+        if FIELD_ID_PROP not in field:
+            raise ValueError(f"Cannot convert field, missing {FIELD_ID_PROP}: 
{field}")
 
         plain_type, required = self._resolve_union(field["type"])
 
         return NestedField(
-            field_id=field["field-id"],
+            field_id=field[FIELD_ID_PROP],
             name=field["name"],
             field_type=self._convert_schema(plain_type),
             required=required,
@@ -524,12 +531,18 @@ class 
ConvertSchemaToAvro(SchemaVisitorPerPrimitiveType[AvroType]):
         if isinstance(field_result, dict) and field_result.get("type") == 
"record":
             field_result["name"] = f"r{field.field_id}"
 
+        original_name = field.name
+        sanitized_name = make_compatible_name(original_name)
+
         result = {
-            "name": field.name,
-            "field-id": field.field_id,
+            "name": sanitized_name,
+            FIELD_ID_PROP: field.field_id,
             "type": field_result if field.required else ["null", field_result],
         }
 
+        if original_name != sanitized_name:
+            result[ICEBERG_FIELD_NAME_PROP] = original_name
+
         if field.write_default is not None:
             result["default"] = field.write_default
         elif field.optional:
@@ -564,8 +577,8 @@ class 
ConvertSchemaToAvro(SchemaVisitorPerPrimitiveType[AvroType]):
                     "type": "record",
                     "name": 
f"k{self.last_map_key_field_id}_v{self.last_map_value_field_id}",
                     "fields": [
-                        {"name": "key", "type": key_result, "field-id": 
self.last_map_key_field_id},
-                        {"name": "value", "type": value_result, "field-id": 
self.last_map_value_field_id},
+                        {"name": "key", "type": key_result, FIELD_ID_PROP: 
self.last_map_key_field_id},
+                        {"name": "value", "type": value_result, FIELD_ID_PROP: 
self.last_map_value_field_id},
                     ],
                 },
                 "logicalType": "map",
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index b73680e4..173ddf78 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -1201,6 +1201,137 @@ def test_sanitize_character_partitioned(catalog: 
Catalog) -> None:
     assert len(tbl.scan().to_arrow()) == 22
 
 
[email protected]
[email protected]("catalog", [pytest.lazy_fixture("session_catalog")])
+def test_sanitize_character_partitioned_avro_bug(catalog: Catalog) -> None:
+    table_name = "default.test_table_partitioned_sanitized_character_avro"
+    try:
+        catalog.drop_table(table_name)
+    except NoSuchTableError:
+        pass
+
+    schema = Schema(
+        NestedField(id=1, name="😎", field_type=StringType(), required=False),
+    )
+
+    partition_spec = PartitionSpec(
+        PartitionField(
+            source_id=1,
+            field_id=1001,
+            transform=IdentityTransform(),
+            name="😎",
+        )
+    )
+
+    tbl = _create_table(
+        session_catalog=catalog,
+        identifier=table_name,
+        schema=schema,
+        partition_spec=partition_spec,
+        data=[
+            pa.Table.from_arrays(
+                [pa.array([str(i) for i in range(22)])], 
schema=pa.schema([pa.field("😎", pa.string(), nullable=False)])
+            )
+        ],
+    )
+
+    assert len(tbl.scan().to_arrow()) == 22
+
+    # verify that we can read the table with DuckDB
+    import duckdb
+
+    location = tbl.metadata_location
+    duckdb.sql("INSTALL iceberg; LOAD iceberg;")
+    # Configure S3 settings for DuckDB to match the catalog configuration
+    duckdb.sql("SET s3_endpoint='localhost:9000';")
+    duckdb.sql("SET s3_access_key_id='admin';")
+    duckdb.sql("SET s3_secret_access_key='password';")
+    duckdb.sql("SET s3_use_ssl=false;")
+    duckdb.sql("SET s3_url_style='path';")
+    result = duckdb.sql(f"SELECT * FROM iceberg_scan('{location}')").fetchall()
+    assert len(result) == 22
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_cross_platform_special_character_compatibility(
+    spark: SparkSession, session_catalog: Catalog, format_version: int
+) -> None:
+    """Test cross-platform compatibility with special characters in column 
names."""
+    identifier = "default.test_cross_platform_special_characters"
+
+    # Test various special characters that need sanitization
+    special_characters = [
+        "😎",  # emoji - Java produces _xD83D_xDE0E, Python produces _x1F60E
+        "a.b",  # dot - both should produce a_x2Eb
+        "a#b",  # hash - both should produce a_x23b
+        "9x",  # starts with digit - both should produce _9x
+        "x_",  # valid - should remain unchanged
+        "letter/abc",  # slash - both should produce letter_x2Fabc
+    ]
+
+    for i, special_char in enumerate(special_characters):
+        table_name = f"{identifier}_{format_version}_{i}"
+        pyiceberg_table_name = f"{identifier}_pyiceberg_{format_version}_{i}"
+
+        try:
+            session_catalog.drop_table(table_name)
+        except Exception:
+            pass
+        try:
+            session_catalog.drop_table(pyiceberg_table_name)
+        except Exception:
+            pass
+
+        try:
+            # Test 1: Spark writes, PyIceberg reads
+            spark_df = spark.createDataFrame([("test_value",)], [special_char])
+            spark_df.writeTo(table_name).using("iceberg").createOrReplace()
+
+            # Read with PyIceberg table scan
+            tbl = session_catalog.load_table(table_name)
+            pyiceberg_df = tbl.scan().to_pandas()
+            assert len(pyiceberg_df) == 1
+            assert special_char in pyiceberg_df.columns
+            assert pyiceberg_df.iloc[0][special_char] == "test_value"
+
+            # Test 2: PyIceberg writes, Spark reads
+            from pyiceberg.schema import Schema
+            from pyiceberg.types import NestedField, StringType
+
+            schema = Schema(NestedField(field_id=1, name=special_char, 
field_type=StringType(), required=True))
+
+            tbl_pyiceberg = session_catalog.create_table(
+                identifier=pyiceberg_table_name, schema=schema, 
properties={"format-version": str(format_version)}
+            )
+
+            import pyarrow as pa
+
+            # Create PyArrow schema with required field to match Iceberg schema
+            pa_schema = pa.schema([pa.field(special_char, pa.string(), 
nullable=False)])
+            data = pa.Table.from_pydict({special_char: ["pyiceberg_value"]}, 
schema=pa_schema)
+            tbl_pyiceberg.append(data)
+
+            # Read with Spark
+            spark_df_read = spark.table(pyiceberg_table_name)
+            spark_result = spark_df_read.collect()
+
+            # Verify data integrity
+            assert len(spark_result) == 1
+            assert special_char in spark_df_read.columns
+            assert spark_result[0][special_char] == "pyiceberg_value"
+
+        finally:
+            try:
+                session_catalog.drop_table(table_name)
+            except Exception:
+                pass
+            try:
+                session_catalog.drop_table(pyiceberg_table_name)
+            except Exception:
+                pass
+
+
 @pytest.mark.integration
 @pytest.mark.parametrize("format_version", [1, 2])
 def test_table_write_subset_of_schema(session_catalog: Catalog, 
arrow_table_with_null: pa.Table, format_version: int) -> None:
diff --git a/tests/test_avro_sanitization.py b/tests/test_avro_sanitization.py
new file mode 100644
index 00000000..0ca23e31
--- /dev/null
+++ b/tests/test_avro_sanitization.py
@@ -0,0 +1,269 @@
+# type: ignore
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import tempfile
+from typing import Any, Dict
+
+from fastavro import reader
+
+import pyiceberg.avro.file as avro
+from pyiceberg.io.pyarrow import PyArrowFileIO
+from pyiceberg.schema import ICEBERG_FIELD_NAME_PROP, Schema
+from pyiceberg.typedef import Record
+from pyiceberg.types import IntegerType, NestedField, StringType
+from pyiceberg.utils.schema_conversion import AvroSchemaConversion, AvroType
+
+
+class AvroTestRecord(Record):
+    """Test record class for Avro compatibility testing."""
+
+    @property
+    def valid_field(self) -> str:
+        return self._data[0]
+
+    @property
+    def invalid_field(self) -> int:
+        return self._data[1]
+
+    @property
+    def field_starting_with_digit(self) -> str:
+        return self._data[2]
+
+
+def test_comprehensive_field_name_sanitization() -> None:
+    """Test comprehensive field name sanitization including edge cases and 
Java compatibility."""
+
+    test_cases = [
+        # Java compatibility test cases
+        ("9x", "_9x"),
+        ("x_", "x_"),
+        ("a.b", "a_x2Eb"),
+        ("☃", "_x2603"),
+        ("a#b", "a_x23b"),
+        ("123", "_123"),
+        ("_", "_"),
+        ("a", "a"),
+        ("a1", "a1"),
+        ("1a", "_1a"),
+        ("a☃b", "a_x2603b"),
+        ("name#with#hash", "name_x23with_x23hash"),
+        ("123number", "_123number"),
+        ("😎", "_x1F60E"),
+        ("😎_with_text", "_x1F60E_with_text"),
+    ]
+
+    for original_name, expected_sanitized in test_cases:
+        schema = Schema(NestedField(field_id=1, name=original_name, 
field_type=StringType(), required=True))
+
+        avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema)
+        avro_dict: Dict[str, Any] = avro_schema
+
+        assert avro_dict["fields"][0]["name"] == expected_sanitized
+
+        if original_name != expected_sanitized:
+            assert avro_dict["fields"][0][ICEBERG_FIELD_NAME_PROP] == 
original_name
+        else:
+            assert ICEBERG_FIELD_NAME_PROP not in avro_dict["fields"][0]
+
+
+def test_comprehensive_avro_compatibility() -> None:
+    """Test comprehensive Avro compatibility including complex schemas and 
file structure."""
+
+    # Create schema with various field name types
+    schema = Schema(
+        NestedField(field_id=1, name="valid_field", field_type=StringType(), 
required=True),
+        NestedField(field_id=2, name="invalid.field", 
field_type=IntegerType(), required=True),
+        NestedField(field_id=3, name="9x", field_type=StringType(), 
required=True),
+        NestedField(field_id=4, name="name#with#hash", 
field_type=StringType(), required=True),
+        NestedField(field_id=5, name="☃", field_type=IntegerType(), 
required=True),
+        NestedField(field_id=6, name="😎", field_type=IntegerType(), 
required=True),
+    )
+
+    test_records = [
+        AvroTestRecord("hello", 42, "test", "hash_value", 100, 200),
+        AvroTestRecord("goodbye", 99, "example", "another_hash", 200, 300),
+    ]
+
+    with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file:
+        tmp_avro_file = tmp_file.name
+
+    try:
+        with avro.AvroOutputFile[AvroTestRecord](
+            output_file=PyArrowFileIO().new_output(tmp_avro_file),
+            file_schema=schema,
+            schema_name="test_schema",
+            metadata={"test": "metadata"},
+        ) as output_file:
+            output_file.write_block(test_records)
+
+        with open(tmp_avro_file, "rb") as fo:
+            # Test Avro file structure
+            magic = fo.read(4)
+            assert magic == b"Obj\x01"  # Avro magic bytes
+
+            import struct
+
+            metadata_length = struct.unpack(">I", fo.read(4))[0]
+            assert metadata_length > 0
+
+            fo.seek(0)
+            avro_reader = reader(fo)
+
+            avro_schema: AvroType = avro_reader.writer_schema
+            avro_dict: Dict[str, Any] = avro_schema
+            field_names = [field["name"] for field in avro_dict["fields"]]
+
+            # Expected sanitized names (matching Java implementation)
+            expected_field_names = [
+                "valid_field",
+                "invalid_x2Efield",
+                "_9x",
+                "name_x23with_x23hash",
+                "_x2603",
+                "_x1F60E",
+            ]
+
+            assert field_names == expected_field_names
+
+            # Verify iceberg-field-name properties
+            for field in avro_dict["fields"]:
+                field_dict: Dict[str, Any] = field
+                if field_dict["name"] == "invalid_x2Efield":
+                    assert "iceberg-field-name" in field_dict
+                    assert field_dict["iceberg-field-name"] == "invalid.field"
+                elif field_dict["name"] == "_9x":
+                    assert "iceberg-field-name" in field_dict
+                    assert field_dict["iceberg-field-name"] == "9x"
+                elif field_dict["name"] == "name_x23with_x23hash":
+                    assert "iceberg-field-name" in field_dict
+                    assert field_dict["iceberg-field-name"] == "name#with#hash"
+                elif field_dict["name"] == "_x2603":
+                    assert "iceberg-field-name" in field_dict
+                    assert field_dict["iceberg-field-name"] == "☃"
+                elif field_dict["name"] == "_x1F60E":
+                    assert "iceberg-field-name" in field_dict
+                    assert field_dict["iceberg-field-name"] == "😎"
+                else:
+                    assert "iceberg-field-name" not in field_dict
+
+            records = list(avro_reader)
+            assert len(records) == 2
+
+            # Verify data integrity
+            first_record = records[0]
+            assert first_record["valid_field"] == "hello"
+            assert first_record["invalid_x2Efield"] == 42
+            assert first_record["_9x"] == "test"
+            assert first_record["name_x23with_x23hash"] == "hash_value"
+            assert first_record["_x2603"] == 100
+            assert first_record["_x1F60E"] == 200
+
+            second_record = records[1]
+            assert second_record["valid_field"] == "goodbye"
+            assert second_record["invalid_x2Efield"] == 99
+            assert second_record["_9x"] == "example"
+            assert second_record["name_x23with_x23hash"] == "another_hash"
+            assert second_record["_x2603"] == 200
+            assert second_record["_x1F60E"] == 300
+
+            assert avro_reader.metadata.get("test") == "metadata"
+
+    finally:
+        import os
+
+        if os.path.exists(tmp_avro_file):
+            os.unlink(tmp_avro_file)
+
+
+def test_emoji_field_name_sanitization() -> None:
+    """Test that emoji field names are properly sanitized according to Java 
implementation."""
+
+    schema = Schema(
+        NestedField(field_id=1, name="😎", field_type=IntegerType(), 
required=True),
+        NestedField(field_id=2, name="valid_field", field_type=StringType(), 
required=True),
+        NestedField(field_id=3, name="😎_with_text", field_type=StringType(), 
required=True),
+    )
+
+    avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema, 
schema_name="emoji_test")
+    avro_dict: Dict[str, Any] = avro_schema
+
+    field_names = [field["name"] for field in avro_dict["fields"]]
+    expected_field_names = [
+        "_x1F60E",  # 😎 becomes _x1F60E (Unicode 0x1F60E)
+        "valid_field",
+        "_x1F60E_with_text",
+    ]
+
+    assert field_names == expected_field_names
+
+    for field in avro_dict["fields"]:
+        field_dict: Dict[str, Any] = field
+        if field_dict["name"] == "_x1F60E":
+            assert field_dict["iceberg-field-name"] == "😎"
+        elif field_dict["name"] == "_x1F60E_with_text":
+            assert field_dict["iceberg-field-name"] == "😎_with_text"
+        else:
+            assert "iceberg-field-name" not in field_dict
+
+    test_records = [
+        AvroTestRecord(42, "hello", "world"),
+    ]
+
+    with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file:
+        tmp_avro_file = tmp_file.name
+
+    try:
+        with avro.AvroOutputFile[AvroTestRecord](
+            output_file=PyArrowFileIO().new_output(tmp_avro_file),
+            file_schema=schema,
+            schema_name="emoji_test",
+        ) as output_file:
+            output_file.write_block(test_records)
+
+        with open(tmp_avro_file, "rb") as fo:
+            avro_reader = reader(fo)
+
+            avro_schema_reader: AvroType = avro_reader.writer_schema
+            avro_dict_reader: Dict[str, Any] = avro_schema_reader
+            field_names_reader = [field["name"] for field in 
avro_dict_reader["fields"]]
+
+            assert field_names_reader == expected_field_names
+
+            for field in avro_dict_reader["fields"]:
+                field_dict_reader: Dict[str, Any] = field
+                if field_dict_reader["name"] == "_x1F60E":
+                    assert field_dict_reader["iceberg-field-name"] == "😎"
+                elif field_dict_reader["name"] == "_x1F60E_with_text":
+                    assert field_dict_reader["iceberg-field-name"] == 
"😎_with_text"
+                else:
+                    assert "iceberg-field-name" not in field_dict_reader
+
+            records = list(avro_reader)
+            assert len(records) == 1
+
+            first_record = records[0]
+            assert first_record["_x1F60E"] == 42
+            assert first_record["valid_field"] == "hello"
+            assert first_record["_x1F60E_with_text"] == "world"
+
+    finally:
+        import os
+
+        if os.path.exists(tmp_avro_file):
+            os.unlink(tmp_avro_file)

Reply via email to