This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new b6a45ed7 fix: sanitize invalid Avro field names in manifest file
(#2245)
b6a45ed7 is described below
commit b6a45ed70676e1f0e832fa820a4d8b4b0bd42d23
Author: Kristofer Gaudel <[email protected]>
AuthorDate: Wed Aug 6 17:12:32 2025 -0400
fix: sanitize invalid Avro field names in manifest file (#2245)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
Closes https://github.com/apache/iceberg-python/issues/2123
# Rationale for this change
Fixing sanitization behaviour to match specification and Java
implementation
# Are these changes tested?
Yes - Unit and integration tests
# Are there any user-facing changes?
Yes - Field names will be sanitized to be Avro compatible if not already
<!-- In the case of user-facing changes, please add the changelog label.
-->
---------
Co-authored-by: Kevin Liu <[email protected]>
---
pyiceberg/schema.py | 22 ++-
pyiceberg/utils/schema_conversion.py | 29 ++-
tests/integration/test_writes/test_writes.py | 131 +++++++++++++
tests/test_avro_sanitization.py | 269 +++++++++++++++++++++++++++
4 files changed, 442 insertions(+), 9 deletions(-)
diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py
index 6333ace6..1eadc583 100644
--- a/pyiceberg/schema.py
+++ b/pyiceberg/schema.py
@@ -78,6 +78,9 @@ P = TypeVar("P")
INITIAL_SCHEMA_ID = 0
+FIELD_ID_PROP = "field-id"
+ICEBERG_FIELD_NAME_PROP = "iceberg-field-name"
+
class Schema(IcebergBaseModel):
"""A table Schema.
@@ -1356,6 +1359,21 @@ class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
# Implementation copied from Apache Iceberg repo.
def make_compatible_name(name: str) -> str:
+ """Make a field name compatible with Avro specification.
+
+ This function sanitizes field names to comply with Avro naming rules:
+ - Names must start with [A-Za-z_]
+ - Subsequent characters must be [A-Za-z0-9_]
+
+ Invalid characters are replaced with _xHHHH where HHHH is the hex code.
+ Names starting with digits get a leading underscore.
+
+ Args:
+ name: The original field name
+
+ Returns:
+ A sanitized name that complies with Avro specification
+ """
if not _valid_avro_name(name):
return _sanitize_name(name)
return name
@@ -1391,7 +1409,9 @@ def _sanitize_name(name: str) -> str:
def _sanitize_char(character: str) -> str:
- return "_" + character if character.isdigit() else "_x" +
hex(ord(character))[2:].upper()
+ if character.isdigit():
+ return "_" + character
+ return "_x" + hex(ord(character))[2:].upper()
def sanitize_column_names(schema: Schema) -> Schema:
diff --git a/pyiceberg/utils/schema_conversion.py
b/pyiceberg/utils/schema_conversion.py
index 232b8f00..551fa401 100644
--- a/pyiceberg/utils/schema_conversion.py
+++ b/pyiceberg/utils/schema_conversion.py
@@ -26,7 +26,14 @@ from typing import (
Union,
)
-from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
+from pyiceberg.schema import (
+ FIELD_ID_PROP,
+ ICEBERG_FIELD_NAME_PROP,
+ Schema,
+ SchemaVisitorPerPrimitiveType,
+ make_compatible_name,
+ visit,
+)
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -225,13 +232,13 @@ class AvroSchemaConversion:
Returns:
The Iceberg equivalent field.
"""
- if "field-id" not in field:
- raise ValueError(f"Cannot convert field, missing field-id:
{field}")
+ if FIELD_ID_PROP not in field:
+ raise ValueError(f"Cannot convert field, missing {FIELD_ID_PROP}:
{field}")
plain_type, required = self._resolve_union(field["type"])
return NestedField(
- field_id=field["field-id"],
+ field_id=field[FIELD_ID_PROP],
name=field["name"],
field_type=self._convert_schema(plain_type),
required=required,
@@ -524,12 +531,18 @@ class
ConvertSchemaToAvro(SchemaVisitorPerPrimitiveType[AvroType]):
if isinstance(field_result, dict) and field_result.get("type") ==
"record":
field_result["name"] = f"r{field.field_id}"
+ original_name = field.name
+ sanitized_name = make_compatible_name(original_name)
+
result = {
- "name": field.name,
- "field-id": field.field_id,
+ "name": sanitized_name,
+ FIELD_ID_PROP: field.field_id,
"type": field_result if field.required else ["null", field_result],
}
+ if original_name != sanitized_name:
+ result[ICEBERG_FIELD_NAME_PROP] = original_name
+
if field.write_default is not None:
result["default"] = field.write_default
elif field.optional:
@@ -564,8 +577,8 @@ class
ConvertSchemaToAvro(SchemaVisitorPerPrimitiveType[AvroType]):
"type": "record",
"name":
f"k{self.last_map_key_field_id}_v{self.last_map_value_field_id}",
"fields": [
- {"name": "key", "type": key_result, "field-id":
self.last_map_key_field_id},
- {"name": "value", "type": value_result, "field-id":
self.last_map_value_field_id},
+ {"name": "key", "type": key_result, FIELD_ID_PROP:
self.last_map_key_field_id},
+ {"name": "value", "type": value_result, FIELD_ID_PROP:
self.last_map_value_field_id},
],
},
"logicalType": "map",
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index b73680e4..173ddf78 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -1201,6 +1201,137 @@ def test_sanitize_character_partitioned(catalog:
Catalog) -> None:
assert len(tbl.scan().to_arrow()) == 22
[email protected]
[email protected]("catalog", [pytest.lazy_fixture("session_catalog")])
+def test_sanitize_character_partitioned_avro_bug(catalog: Catalog) -> None:
+ table_name = "default.test_table_partitioned_sanitized_character_avro"
+ try:
+ catalog.drop_table(table_name)
+ except NoSuchTableError:
+ pass
+
+ schema = Schema(
+ NestedField(id=1, name="😎", field_type=StringType(), required=False),
+ )
+
+ partition_spec = PartitionSpec(
+ PartitionField(
+ source_id=1,
+ field_id=1001,
+ transform=IdentityTransform(),
+ name="😎",
+ )
+ )
+
+ tbl = _create_table(
+ session_catalog=catalog,
+ identifier=table_name,
+ schema=schema,
+ partition_spec=partition_spec,
+ data=[
+ pa.Table.from_arrays(
+ [pa.array([str(i) for i in range(22)])],
schema=pa.schema([pa.field("😎", pa.string(), nullable=False)])
+ )
+ ],
+ )
+
+ assert len(tbl.scan().to_arrow()) == 22
+
+ # verify that we can read the table with DuckDB
+ import duckdb
+
+ location = tbl.metadata_location
+ duckdb.sql("INSTALL iceberg; LOAD iceberg;")
+ # Configure S3 settings for DuckDB to match the catalog configuration
+ duckdb.sql("SET s3_endpoint='localhost:9000';")
+ duckdb.sql("SET s3_access_key_id='admin';")
+ duckdb.sql("SET s3_secret_access_key='password';")
+ duckdb.sql("SET s3_use_ssl=false;")
+ duckdb.sql("SET s3_url_style='path';")
+ result = duckdb.sql(f"SELECT * FROM iceberg_scan('{location}')").fetchall()
+ assert len(result) == 22
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_cross_platform_special_character_compatibility(
+ spark: SparkSession, session_catalog: Catalog, format_version: int
+) -> None:
+ """Test cross-platform compatibility with special characters in column
names."""
+ identifier = "default.test_cross_platform_special_characters"
+
+ # Test various special characters that need sanitization
+ special_characters = [
+ "😎", # emoji - Java produces _xD83D_xDE0E, Python produces _x1F60E
+ "a.b", # dot - both should produce a_x2Eb
+ "a#b", # hash - both should produce a_x23b
+ "9x", # starts with digit - both should produce _9x
+ "x_", # valid - should remain unchanged
+ "letter/abc", # slash - both should produce letter_x2Fabc
+ ]
+
+ for i, special_char in enumerate(special_characters):
+ table_name = f"{identifier}_{format_version}_{i}"
+ pyiceberg_table_name = f"{identifier}_pyiceberg_{format_version}_{i}"
+
+ try:
+ session_catalog.drop_table(table_name)
+ except Exception:
+ pass
+ try:
+ session_catalog.drop_table(pyiceberg_table_name)
+ except Exception:
+ pass
+
+ try:
+ # Test 1: Spark writes, PyIceberg reads
+ spark_df = spark.createDataFrame([("test_value",)], [special_char])
+ spark_df.writeTo(table_name).using("iceberg").createOrReplace()
+
+ # Read with PyIceberg table scan
+ tbl = session_catalog.load_table(table_name)
+ pyiceberg_df = tbl.scan().to_pandas()
+ assert len(pyiceberg_df) == 1
+ assert special_char in pyiceberg_df.columns
+ assert pyiceberg_df.iloc[0][special_char] == "test_value"
+
+ # Test 2: PyIceberg writes, Spark reads
+ from pyiceberg.schema import Schema
+ from pyiceberg.types import NestedField, StringType
+
+ schema = Schema(NestedField(field_id=1, name=special_char,
field_type=StringType(), required=True))
+
+ tbl_pyiceberg = session_catalog.create_table(
+ identifier=pyiceberg_table_name, schema=schema,
properties={"format-version": str(format_version)}
+ )
+
+ import pyarrow as pa
+
+ # Create PyArrow schema with required field to match Iceberg schema
+ pa_schema = pa.schema([pa.field(special_char, pa.string(),
nullable=False)])
+ data = pa.Table.from_pydict({special_char: ["pyiceberg_value"]},
schema=pa_schema)
+ tbl_pyiceberg.append(data)
+
+ # Read with Spark
+ spark_df_read = spark.table(pyiceberg_table_name)
+ spark_result = spark_df_read.collect()
+
+ # Verify data integrity
+ assert len(spark_result) == 1
+ assert special_char in spark_df_read.columns
+ assert spark_result[0][special_char] == "pyiceberg_value"
+
+ finally:
+ try:
+ session_catalog.drop_table(table_name)
+ except Exception:
+ pass
+ try:
+ session_catalog.drop_table(pyiceberg_table_name)
+ except Exception:
+ pass
+
+
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_write_subset_of_schema(session_catalog: Catalog,
arrow_table_with_null: pa.Table, format_version: int) -> None:
diff --git a/tests/test_avro_sanitization.py b/tests/test_avro_sanitization.py
new file mode 100644
index 00000000..0ca23e31
--- /dev/null
+++ b/tests/test_avro_sanitization.py
@@ -0,0 +1,269 @@
+# type: ignore
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import tempfile
+from typing import Any, Dict
+
+from fastavro import reader
+
+import pyiceberg.avro.file as avro
+from pyiceberg.io.pyarrow import PyArrowFileIO
+from pyiceberg.schema import ICEBERG_FIELD_NAME_PROP, Schema
+from pyiceberg.typedef import Record
+from pyiceberg.types import IntegerType, NestedField, StringType
+from pyiceberg.utils.schema_conversion import AvroSchemaConversion, AvroType
+
+
+class AvroTestRecord(Record):
+ """Test record class for Avro compatibility testing."""
+
+ @property
+ def valid_field(self) -> str:
+ return self._data[0]
+
+ @property
+ def invalid_field(self) -> int:
+ return self._data[1]
+
+ @property
+ def field_starting_with_digit(self) -> str:
+ return self._data[2]
+
+
+def test_comprehensive_field_name_sanitization() -> None:
+ """Test comprehensive field name sanitization including edge cases and
Java compatibility."""
+
+ test_cases = [
+ # Java compatibility test cases
+ ("9x", "_9x"),
+ ("x_", "x_"),
+ ("a.b", "a_x2Eb"),
+ ("☃", "_x2603"),
+ ("a#b", "a_x23b"),
+ ("123", "_123"),
+ ("_", "_"),
+ ("a", "a"),
+ ("a1", "a1"),
+ ("1a", "_1a"),
+ ("a☃b", "a_x2603b"),
+ ("name#with#hash", "name_x23with_x23hash"),
+ ("123number", "_123number"),
+ ("😎", "_x1F60E"),
+ ("😎_with_text", "_x1F60E_with_text"),
+ ]
+
+ for original_name, expected_sanitized in test_cases:
+ schema = Schema(NestedField(field_id=1, name=original_name,
field_type=StringType(), required=True))
+
+ avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema)
+ avro_dict: Dict[str, Any] = avro_schema
+
+ assert avro_dict["fields"][0]["name"] == expected_sanitized
+
+ if original_name != expected_sanitized:
+ assert avro_dict["fields"][0][ICEBERG_FIELD_NAME_PROP] ==
original_name
+ else:
+ assert ICEBERG_FIELD_NAME_PROP not in avro_dict["fields"][0]
+
+
+def test_comprehensive_avro_compatibility() -> None:
+ """Test comprehensive Avro compatibility including complex schemas and
file structure."""
+
+ # Create schema with various field name types
+ schema = Schema(
+ NestedField(field_id=1, name="valid_field", field_type=StringType(),
required=True),
+ NestedField(field_id=2, name="invalid.field",
field_type=IntegerType(), required=True),
+ NestedField(field_id=3, name="9x", field_type=StringType(),
required=True),
+ NestedField(field_id=4, name="name#with#hash",
field_type=StringType(), required=True),
+ NestedField(field_id=5, name="☃", field_type=IntegerType(),
required=True),
+ NestedField(field_id=6, name="😎", field_type=IntegerType(),
required=True),
+ )
+
+ test_records = [
+ AvroTestRecord("hello", 42, "test", "hash_value", 100, 200),
+ AvroTestRecord("goodbye", 99, "example", "another_hash", 200, 300),
+ ]
+
+ with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file:
+ tmp_avro_file = tmp_file.name
+
+ try:
+ with avro.AvroOutputFile[AvroTestRecord](
+ output_file=PyArrowFileIO().new_output(tmp_avro_file),
+ file_schema=schema,
+ schema_name="test_schema",
+ metadata={"test": "metadata"},
+ ) as output_file:
+ output_file.write_block(test_records)
+
+ with open(tmp_avro_file, "rb") as fo:
+ # Test Avro file structure
+ magic = fo.read(4)
+ assert magic == b"Obj\x01" # Avro magic bytes
+
+ import struct
+
+ metadata_length = struct.unpack(">I", fo.read(4))[0]
+ assert metadata_length > 0
+
+ fo.seek(0)
+ avro_reader = reader(fo)
+
+ avro_schema: AvroType = avro_reader.writer_schema
+ avro_dict: Dict[str, Any] = avro_schema
+ field_names = [field["name"] for field in avro_dict["fields"]]
+
+ # Expected sanitized names (matching Java implementation)
+ expected_field_names = [
+ "valid_field",
+ "invalid_x2Efield",
+ "_9x",
+ "name_x23with_x23hash",
+ "_x2603",
+ "_x1F60E",
+ ]
+
+ assert field_names == expected_field_names
+
+ # Verify iceberg-field-name properties
+ for field in avro_dict["fields"]:
+ field_dict: Dict[str, Any] = field
+ if field_dict["name"] == "invalid_x2Efield":
+ assert "iceberg-field-name" in field_dict
+ assert field_dict["iceberg-field-name"] == "invalid.field"
+ elif field_dict["name"] == "_9x":
+ assert "iceberg-field-name" in field_dict
+ assert field_dict["iceberg-field-name"] == "9x"
+ elif field_dict["name"] == "name_x23with_x23hash":
+ assert "iceberg-field-name" in field_dict
+ assert field_dict["iceberg-field-name"] == "name#with#hash"
+ elif field_dict["name"] == "_x2603":
+ assert "iceberg-field-name" in field_dict
+ assert field_dict["iceberg-field-name"] == "☃"
+ elif field_dict["name"] == "_x1F60E":
+ assert "iceberg-field-name" in field_dict
+ assert field_dict["iceberg-field-name"] == "😎"
+ else:
+ assert "iceberg-field-name" not in field_dict
+
+ records = list(avro_reader)
+ assert len(records) == 2
+
+ # Verify data integrity
+ first_record = records[0]
+ assert first_record["valid_field"] == "hello"
+ assert first_record["invalid_x2Efield"] == 42
+ assert first_record["_9x"] == "test"
+ assert first_record["name_x23with_x23hash"] == "hash_value"
+ assert first_record["_x2603"] == 100
+ assert first_record["_x1F60E"] == 200
+
+ second_record = records[1]
+ assert second_record["valid_field"] == "goodbye"
+ assert second_record["invalid_x2Efield"] == 99
+ assert second_record["_9x"] == "example"
+ assert second_record["name_x23with_x23hash"] == "another_hash"
+ assert second_record["_x2603"] == 200
+ assert second_record["_x1F60E"] == 300
+
+ assert avro_reader.metadata.get("test") == "metadata"
+
+ finally:
+ import os
+
+ if os.path.exists(tmp_avro_file):
+ os.unlink(tmp_avro_file)
+
+
+def test_emoji_field_name_sanitization() -> None:
+ """Test that emoji field names are properly sanitized according to Java
implementation."""
+
+ schema = Schema(
+ NestedField(field_id=1, name="😎", field_type=IntegerType(),
required=True),
+ NestedField(field_id=2, name="valid_field", field_type=StringType(),
required=True),
+ NestedField(field_id=3, name="😎_with_text", field_type=StringType(),
required=True),
+ )
+
+ avro_schema: AvroType = AvroSchemaConversion().iceberg_to_avro(schema,
schema_name="emoji_test")
+ avro_dict: Dict[str, Any] = avro_schema
+
+ field_names = [field["name"] for field in avro_dict["fields"]]
+ expected_field_names = [
+ "_x1F60E", # 😎 becomes _x1F60E (Unicode 0x1F60E)
+ "valid_field",
+ "_x1F60E_with_text",
+ ]
+
+ assert field_names == expected_field_names
+
+ for field in avro_dict["fields"]:
+ field_dict: Dict[str, Any] = field
+ if field_dict["name"] == "_x1F60E":
+ assert field_dict["iceberg-field-name"] == "😎"
+ elif field_dict["name"] == "_x1F60E_with_text":
+ assert field_dict["iceberg-field-name"] == "😎_with_text"
+ else:
+ assert "iceberg-field-name" not in field_dict
+
+ test_records = [
+ AvroTestRecord(42, "hello", "world"),
+ ]
+
+ with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file:
+ tmp_avro_file = tmp_file.name
+
+ try:
+ with avro.AvroOutputFile[AvroTestRecord](
+ output_file=PyArrowFileIO().new_output(tmp_avro_file),
+ file_schema=schema,
+ schema_name="emoji_test",
+ ) as output_file:
+ output_file.write_block(test_records)
+
+ with open(tmp_avro_file, "rb") as fo:
+ avro_reader = reader(fo)
+
+ avro_schema_reader: AvroType = avro_reader.writer_schema
+ avro_dict_reader: Dict[str, Any] = avro_schema_reader
+ field_names_reader = [field["name"] for field in
avro_dict_reader["fields"]]
+
+ assert field_names_reader == expected_field_names
+
+ for field in avro_dict_reader["fields"]:
+ field_dict_reader: Dict[str, Any] = field
+ if field_dict_reader["name"] == "_x1F60E":
+ assert field_dict_reader["iceberg-field-name"] == "😎"
+ elif field_dict_reader["name"] == "_x1F60E_with_text":
+ assert field_dict_reader["iceberg-field-name"] ==
"😎_with_text"
+ else:
+ assert "iceberg-field-name" not in field_dict_reader
+
+ records = list(avro_reader)
+ assert len(records) == 1
+
+ first_record = records[0]
+ assert first_record["_x1F60E"] == 42
+ assert first_record["valid_field"] == "hello"
+ assert first_record["_x1F60E_with_text"] == "world"
+
+ finally:
+ import os
+
+ if os.path.exists(tmp_avro_file):
+ os.unlink(tmp_avro_file)