This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 8042d822 Convert `_get_column_projection_values` to use Field-IDs
(#2293)
8042d822 is described below
commit 8042d822a34361bcb939a3a2ce3e7a4889235e96
Author: Fokko Driesprong <[email protected]>
AuthorDate: Thu Aug 7 08:18:00 2025 +0200
Convert `_get_column_projection_values` to use Field-IDs (#2293)
# Rationale for this change
This is a refactor of the `_get_column_projection_values` to rely on
field-IDs rather than names. Field IDs will never change, while
partitions and column names can be updated in a tables' lifetime.
# Are these changes tested?
# Are there any user-facing changes?
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
pyiceberg/expressions/visitors.py | 19 ++++---
pyiceberg/io/pyarrow.py | 65 ++++++++--------------
tests/conftest.py | 4 +-
tests/expressions/test_visitors.py | 35 ++++++++++--
.../test_writes/test_partitioned_writes.py | 4 +-
tests/io/test_pyarrow.py | 26 +++++----
6 files changed, 87 insertions(+), 66 deletions(-)
diff --git a/pyiceberg/expressions/visitors.py
b/pyiceberg/expressions/visitors.py
index 99cbc0fb..779f2b47 100644
--- a/pyiceberg/expressions/visitors.py
+++ b/pyiceberg/expressions/visitors.py
@@ -861,7 +861,7 @@ class
_ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
Args:
file_schema (Schema): The schema of the file.
case_sensitive (bool): Whether to consider case when binding a reference
to a field in a schema, defaults to True.
- projected_field_values (Dict[str, Any]): Values for projected fields not
present in the data file.
+ projected_field_values (Dict[int, Any]): Values for projected fields not
present in the data file.
Raises:
TypeError: In the case of an UnboundPredicate.
@@ -870,12 +870,12 @@ class
_ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
file_schema: Schema
case_sensitive: bool
- projected_field_values: Dict[str, Any]
+ projected_field_values: Dict[int, Any]
- def __init__(self, file_schema: Schema, case_sensitive: bool,
projected_field_values: Dict[str, Any] = EMPTY_DICT) -> None:
+ def __init__(self, file_schema: Schema, case_sensitive: bool,
projected_field_values: Dict[int, Any] = EMPTY_DICT) -> None:
self.file_schema = file_schema
self.case_sensitive = case_sensitive
- self.projected_field_values = projected_field_values or {}
+ self.projected_field_values = projected_field_values
def visit_true(self) -> BooleanExpression:
return AlwaysTrue()
@@ -897,7 +897,8 @@ class
_ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
def visit_bound_predicate(self, predicate: BoundPredicate[L]) ->
BooleanExpression:
field = predicate.term.ref().field
- file_column_name = self.file_schema.find_column_name(field.field_id)
+ field_id = field.field_id
+ file_column_name = self.file_schema.find_column_name(field_id)
if file_column_name is None:
# In the case of schema evolution or column projection, the field
might not be present in the file schema.
@@ -915,8 +916,10 @@ class
_ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
# In the order described by the "Column Projection" section of the
Iceberg spec:
# https://iceberg.apache.org/spec/#column-projection
# Evaluate column projection first if it exists
- if projected_field_value :=
self.projected_field_values.get(field.name):
- if expression_evaluator(Schema(field), pred,
case_sensitive=self.case_sensitive)(Record(projected_field_value)):
+ if field_id in self.projected_field_values:
+ if expression_evaluator(Schema(field), pred,
case_sensitive=self.case_sensitive)(
+ Record(self.projected_field_values[field_id])
+ ):
return AlwaysTrue()
# Evaluate initial_default value
@@ -937,7 +940,7 @@ class
_ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
def translate_column_names(
- expr: BooleanExpression, file_schema: Schema, case_sensitive: bool,
projected_field_values: Dict[str, Any] = EMPTY_DICT
+ expr: BooleanExpression, file_schema: Schema, case_sensitive: bool,
projected_field_values: Dict[int, Any] = EMPTY_DICT
) -> BooleanExpression:
return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive,
projected_field_values))
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index e6992843..cee2ccac 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -131,7 +131,6 @@ from pyiceberg.manifest import (
)
from pyiceberg.partitioning import PartitionField, PartitionFieldValue,
PartitionKey, PartitionSpec, partition_record_value
from pyiceberg.schema import (
- Accessor,
PartnerAccessor,
PreOrderSchemaVisitor,
Schema,
@@ -1402,41 +1401,23 @@ class _ConvertToIcebergWithoutIDs(_ConvertToIceberg):
def _get_column_projection_values(
file: DataFile, projected_schema: Schema, partition_spec:
Optional[PartitionSpec], file_project_field_ids: Set[int]
-) -> Tuple[bool, Dict[str, Any]]:
+) -> Dict[int, Any]:
"""Apply Column Projection rules to File Schema."""
project_schema_diff =
projected_schema.field_ids.difference(file_project_field_ids)
- should_project_columns = len(project_schema_diff) > 0
- projected_missing_fields: Dict[str, Any] = {}
+ if len(project_schema_diff) == 0 or partition_spec is None:
+ return EMPTY_DICT
- if not should_project_columns:
- return False, {}
-
- partition_schema: StructType
- accessors: Dict[int, Accessor]
-
- if partition_spec is not None:
- partition_schema = partition_spec.partition_type(projected_schema)
- accessors = build_position_accessors(partition_schema)
- else:
- return False, {}
+ partition_schema = partition_spec.partition_type(projected_schema)
+ accessors = build_position_accessors(partition_schema)
+ projected_missing_fields = {}
for field_id in project_schema_diff:
for partition_field in partition_spec.fields_by_source_id(field_id):
if isinstance(partition_field.transform, IdentityTransform):
- accessor = accessors.get(partition_field.field_id)
-
- if accessor is None:
- continue
+ if partition_value :=
accessors[partition_field.field_id].get(file.partition):
+ projected_missing_fields[field_id] = partition_value
- # The partition field may not exist in the partition record of
the data file.
- # This can happen when new partition fields are introduced
after the file was written.
- try:
- if partition_value := accessor.get(file.partition):
- projected_missing_fields[partition_field.name] =
partition_value
- except IndexError:
- continue
-
- return True, projected_missing_fields
+ return projected_missing_fields
def _task_to_record_batches(
@@ -1460,9 +1441,8 @@ def _task_to_record_batches(
# the table format version.
file_schema = pyarrow_to_schema(physical_schema, name_mapping,
downcast_ns_timestamp_to_us=True)
- # Apply column projection rules
- # https://iceberg.apache.org/spec/#column-projection
- should_project_columns, projected_missing_fields =
_get_column_projection_values(
+ # Apply column projection rules:
https://iceberg.apache.org/spec/#column-projection
+ projected_missing_fields = _get_column_projection_values(
task.file, projected_schema, partition_spec, file_schema.field_ids
)
@@ -1517,16 +1497,9 @@ def _task_to_record_batches(
file_project_schema,
current_batch,
downcast_ns_timestamp_to_us=True,
+ projected_missing_fields=projected_missing_fields,
)
- # Inject projected column values if available
- if should_project_columns:
- for name, value in projected_missing_fields.items():
- index = result_batch.schema.get_field_index(name)
- if index != -1:
- arr = pa.repeat(value, result_batch.num_rows)
- result_batch = result_batch.set_column(index, name,
arr)
-
yield result_batch
@@ -1696,7 +1669,7 @@ class ArrowScan:
deletes_per_file.get(task.file.file_path),
self._case_sensitive,
self._table_metadata.name_mapping(),
- self._table_metadata.spec(),
+ self._table_metadata.specs().get(task.file.spec_id),
)
for batch in batches:
if self._limit is not None:
@@ -1714,12 +1687,15 @@ def _to_requested_schema(
batch: pa.RecordBatch,
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
+ projected_missing_fields: Dict[int, Any] = EMPTY_DICT,
) -> pa.RecordBatch:
# We could reuse some of these visitors
struct_array = visit_with_partner(
requested_schema,
batch,
- ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us,
include_field_ids),
+ ArrowProjectionVisitor(
+ file_schema, downcast_ns_timestamp_to_us, include_field_ids,
projected_missing_fields=projected_missing_fields
+ ),
ArrowAccessor(file_schema),
)
return pa.RecordBatch.from_struct_array(struct_array)
@@ -1730,6 +1706,7 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
_include_field_ids: bool
_downcast_ns_timestamp_to_us: bool
_use_large_types: Optional[bool]
+ _projected_missing_fields: Dict[int, Any]
def __init__(
self,
@@ -1737,11 +1714,13 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
use_large_types: Optional[bool] = None,
+ projected_missing_fields: Dict[int, Any] = EMPTY_DICT,
) -> None:
self._file_schema = file_schema
self._include_field_ids = include_field_ids
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._use_large_types = use_large_types
+ self._projected_missing_fields = projected_missing_fields
if use_large_types is not None:
deprecation_message(
@@ -1821,7 +1800,9 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
elif field.optional or field.initial_default is not None:
# When an optional field is added, or when a required field
with a non-null initial default is added
arrow_type = schema_to_pyarrow(field.field_type,
include_field_ids=self._include_field_ids)
- if field.initial_default is None:
+ if projected_value :=
self._projected_missing_fields.get(field.field_id):
+ field_arrays.append(pa.repeat(pa.scalar(projected_value,
type=arrow_type), len(struct_array)))
+ elif field.initial_default is None:
field_arrays.append(pa.nulls(len(struct_array),
type=arrow_type))
else:
field_arrays.append(pa.repeat(field.initial_default,
len(struct_array)))
diff --git a/tests/conftest.py b/tests/conftest.py
index c01ccc97..16c9e06d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2375,8 +2375,10 @@ def data_file(table_schema_simple: Schema, tmp_path:
str) -> str:
@pytest.fixture
def example_task(data_file: str) -> FileScanTask:
+ datafile = DataFile.from_args(file_path=data_file,
file_format=FileFormat.PARQUET, file_size_in_bytes=1925)
+ datafile.spec_id = 0
return FileScanTask(
- data_file=DataFile.from_args(file_path=data_file,
file_format=FileFormat.PARQUET, file_size_in_bytes=1925),
+ data_file=datafile,
)
diff --git a/tests/expressions/test_visitors.py
b/tests/expressions/test_visitors.py
index f02aadfe..997cc7f7 100644
--- a/tests/expressions/test_visitors.py
+++ b/tests/expressions/test_visitors.py
@@ -1730,6 +1730,33 @@ def
test_translate_column_names_missing_column_match_null() -> None:
assert translated_expr == AlwaysTrue()
+def test_translate_column_names_missing_column_match_explicit_null() -> None:
+ """Test translate_column_names when missing column matches null."""
+ # Original schema
+ original_schema = Schema(
+ NestedField(field_id=1, name="existing_col", field_type=StringType(),
required=False),
+ NestedField(field_id=2, name="missing_col", field_type=IntegerType(),
required=False),
+ schema_id=1,
+ )
+
+ # Create bound expression for the missing column
+ unbound_expr = IsNull("missing_col")
+ bound_expr = visit(unbound_expr,
visitor=BindVisitor(schema=original_schema, case_sensitive=True))
+
+ # File schema only has the existing column (field_id=1), missing field_id=2
+ file_schema = Schema(
+ NestedField(field_id=1, name="existing_col", field_type=StringType(),
required=False),
+ schema_id=1,
+ )
+
+ # Translate column names
+ translated_expr = translate_column_names(bound_expr, file_schema,
case_sensitive=True, projected_field_values={2: None})
+
+ # Should evaluate to AlwaysTrue because the missing column is treated as
null
+ # missing_col's default initial_default (None) satisfies the IsNull
predicate
+ assert translated_expr == AlwaysTrue()
+
+
def test_translate_column_names_missing_column_with_initial_default() -> None:
"""Test translate_column_names when missing column's initial_default
matches expression."""
# Original schema
@@ -1801,7 +1828,7 @@ def
test_translate_column_names_missing_column_with_projected_field_matches() ->
)
# Projected column that is missing in the file schema
- projected_field_values = {"missing_col": 42}
+ projected_field_values = {2: 42}
# Translate column names
translated_expr = translate_column_names(
@@ -1833,7 +1860,7 @@ def
test_translate_column_names_missing_column_with_projected_field_mismatch() -
)
# Projected column that is missing in the file schema
- projected_field_values = {"missing_col": 1}
+ projected_field_values = {2: 1}
# Translate column names
translated_expr = translate_column_names(
@@ -1864,7 +1891,7 @@ def
test_translate_column_names_missing_column_projected_field_fallbacks_to_init
)
# Projected field value that differs from both the expression literal and
initial_default
- projected_field_values = {"missing_col": 10} # This doesn't match
expression literal (42)
+ projected_field_values = {2: 10} # This doesn't match expression literal
(42)
# Translate column names
translated_expr = translate_column_names(
@@ -1895,7 +1922,7 @@ def
test_translate_column_names_missing_column_projected_field_matches_initial_d
)
# Projected field value that matches the expression literal
- projected_field_values = {"missing_col": 10} # This doesn't match
expression literal (42)
+ projected_field_values = {2: 10} # This doesn't match expression literal
(42)
# Translate column names
translated_expr = translate_column_names(
diff --git a/tests/integration/test_writes/test_partitioned_writes.py
b/tests/integration/test_writes/test_partitioned_writes.py
index b2f6ad41..e9698067 100644
--- a/tests/integration/test_writes/test_partitioned_writes.py
+++ b/tests/integration/test_writes/test_partitioned_writes.py
@@ -711,8 +711,10 @@ def
test_dynamic_partition_overwrite_evolve_partition(spark: SparkSession, sessi
)
identifier =
f"default.partitioned_{format_version}_test_dynamic_partition_overwrite_evolve_partition"
- with pytest.raises(NoSuchTableError):
+ try:
session_catalog.drop_table(identifier)
+ except NoSuchTableError:
+ pass
tbl = session_catalog.create_table(
identifier=identifier,
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index ac16ef18..f5c3082e 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -970,6 +970,10 @@ def file_map(schema_map: Schema, tmpdir: str) -> str:
def project(
schema: Schema, files: List[str], expr: Optional[BooleanExpression] =
None, table_schema: Optional[Schema] = None
) -> pa.Table:
+ def _set_spec_id(datafile: DataFile) -> DataFile:
+ datafile.spec_id = 0
+ return datafile
+
return ArrowScan(
table_metadata=TableMetadataV2(
location="file://a/b/",
@@ -985,13 +989,15 @@ def project(
).to_table(
tasks=[
FileScanTask(
- DataFile.from_args(
- content=DataFileContent.DATA,
- file_path=file,
- file_format=FileFormat.PARQUET,
- partition={},
- record_count=3,
- file_size_in_bytes=3,
+ _set_spec_id(
+ DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=file,
+ file_format=FileFormat.PARQUET,
+ partition={},
+ record_count=3,
+ file_size_in_bytes=3,
+ )
)
)
for file in files
@@ -1189,7 +1195,7 @@ def test_identity_transform_column_projection(tmp_path:
str, catalog: InMemoryCa
with transaction.update_snapshot().overwrite() as update:
update.append_data_file(unpartitioned_file)
- schema = pa.schema([("other_field", pa.string()), ("partition_id",
pa.int64())])
+ schema = pa.schema([("other_field", pa.string()), ("partition_id",
pa.int32())])
assert table.scan().to_arrow() == pa.table(
{
"other_field": ["foo", "bar", "baz"],
@@ -1264,8 +1270,8 @@ def test_identity_transform_columns_projection(tmp_path:
str, catalog: InMemoryC
str(table.scan().to_arrow())
== """pyarrow.Table
field_1: string
-field_2: int64
-field_3: int64
+field_2: int32
+field_3: int32
----
field_1: [["foo"]]
field_2: [[2]]