This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new cf8b46e7 fix: allow reading pyarrow timestamptz as timestamp (#2708)
cf8b46e7 is described below
commit cf8b46e7575d512ed42d57562036b1da91b26d57
Author: Kevin Liu <[email protected]>
AuthorDate: Thu Jan 29 03:48:22 2026 -0500
fix: allow reading pyarrow timestamptz as timestamp (#2708)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
# Rationale for this change
Closes #2663
Relates to #2333 which allow reading pyarrow timestamp as iceberg
timestamptz
This PR allows PyArrow timestamptz to be read as Iceberg timestamp.
Although this configuration does not conform to the Iceberg spec, the
change aligns PyIceberg's behavior with Spark when reading mismatched
types (timestamptz as iceberg timestamp and timestamp as iceberg
timestamptz)
**The write path remains strict and will reject this type mismatch.**
## Are these changes tested?
Yes
## Are there any user-facing changes?
No
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
pyiceberg/io/pyarrow.py | 23 +++++++++--
tests/io/test_pyarrow.py | 101 +++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 120 insertions(+), 4 deletions(-)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 2360cf20..6a50e24d 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1656,6 +1656,7 @@ def _task_to_record_batches(
current_batch,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
projected_missing_fields=projected_missing_fields,
+ allow_timestamp_tz_mismatch=True,
)
@@ -1849,13 +1850,18 @@ def _to_requested_schema(
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
+ allow_timestamp_tz_mismatch: bool = False,
) -> pa.RecordBatch:
# We could reuse some of these visitors
struct_array = visit_with_partner(
requested_schema,
batch,
ArrowProjectionVisitor(
- file_schema, downcast_ns_timestamp_to_us, include_field_ids,
projected_missing_fields=projected_missing_fields
+ file_schema,
+ downcast_ns_timestamp_to_us,
+ include_field_ids,
+ projected_missing_fields=projected_missing_fields,
+ allow_timestamp_tz_mismatch=allow_timestamp_tz_mismatch,
),
ArrowAccessor(file_schema),
)
@@ -1868,6 +1874,7 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
_downcast_ns_timestamp_to_us: bool
_use_large_types: bool | None
_projected_missing_fields: dict[int, Any]
+ _allow_timestamp_tz_mismatch: bool
def __init__(
self,
@@ -1876,12 +1883,16 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
include_field_ids: bool = False,
use_large_types: bool | None = None,
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
+ allow_timestamp_tz_mismatch: bool = False,
) -> None:
self._file_schema = file_schema
self._include_field_ids = include_field_ids
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._use_large_types = use_large_types
self._projected_missing_fields = projected_missing_fields
+ # When True, allows projecting timestamptz (UTC) to timestamp (no tz).
+ # Allowed for reading (aligns with Spark); disallowed for writing to
enforce Iceberg spec's strict typing.
+ self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch
if use_large_types is not None:
deprecation_message(
@@ -1896,16 +1907,19 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
if field.field_type.is_primitive:
if (target_type := schema_to_pyarrow(field.field_type,
include_field_ids=self._include_field_ids)) != values.type:
if field.field_type == TimestampType():
- # Downcasting of nanoseconds to microseconds
+ source_tz_compatible = values.type.tz is None or (
+ self._allow_timestamp_tz_mismatch and values.type.tz
in UTC_ALIASES
+ )
if (
pa.types.is_timestamp(target_type)
and not target_type.tz
and pa.types.is_timestamp(values.type)
- and not values.type.tz
+ and source_tz_compatible
):
+ # Downcasting of nanoseconds to microseconds
if target_type.unit == "us" and values.type.unit ==
"ns" and self._downcast_ns_timestamp_to_us:
return values.cast(target_type, safe=False)
- elif target_type.unit == "us" and values.type.unit in
{"s", "ms"}:
+ elif target_type.unit == "us" and values.type.unit in
{"s", "ms", "us"}:
return values.cast(target_type)
raise ValueError(f"Unsupported schema projection from
{values.type} to {target_type}")
elif field.field_type == TimestamptzType():
@@ -1915,6 +1929,7 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
and pa.types.is_timestamp(values.type)
and (values.type.tz in UTC_ALIASES or values.type.tz
is None)
):
+ # Downcasting of nanoseconds to microseconds
if target_type.unit == "us" and values.type.unit ==
"ns" and self._downcast_ns_timestamp_to_us:
return values.cast(target_type, safe=False)
elif target_type.unit == "us" and values.type.unit in
{"s", "ms", "us"}:
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 89c11435..04bc3ecf 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -81,6 +81,7 @@ from pyiceberg.io.pyarrow import (
expression_to_pyarrow,
parquet_path_to_id_mapping,
schema_to_pyarrow,
+ write_file,
)
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -2725,6 +2726,106 @@ def
test__to_requested_schema_timestamp_to_timestamptz_projection() -> None:
assert expected.equals(actual_result)
+def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None:
+ # file is written with timestamp with timezone
+ file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(),
required=False))
+ batch = pa.record_batch(
+ [
+ pa.array(
+ [
+ datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
+ datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
+ ],
+ type=pa.timestamp("us", tz="UTC"),
+ )
+ ],
+ names=["ts_field"],
+ )
+
+ # table schema expects timestamp without timezone
+ table_schema = Schema(NestedField(1, "ts_field", TimestampType(),
required=False))
+
+ # allow_timestamp_tz_mismatch=True enables reading timestamptz as timestamp
+ actual_result = _to_requested_schema(
+ table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True,
allow_timestamp_tz_mismatch=True
+ )
+ expected = pa.record_batch(
+ [
+ pa.array(
+ [
+ datetime(2025, 8, 14, 12, 0, 0),
+ datetime(2025, 8, 14, 13, 0, 0),
+ ],
+ type=pa.timestamp("us"),
+ )
+ ],
+ names=["ts_field"],
+ )
+
+ # expect actual_result to have no timezone
+ assert expected.equals(actual_result)
+
+
+def test__to_requested_schema_timestamptz_to_timestamp_write_rejects() -> None:
+ """Test that the write path (default) rejects timestamptz to timestamp
casting.
+
+ This ensures we enforce the Iceberg spec distinction between timestamp and
timestamptz on writes,
+ while the read path can be more permissive (like Spark) via
allow_timestamp_tz_mismatch=True.
+ """
+ # file is written with timestamp with timezone
+ file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(),
required=False))
+ batch = pa.record_batch(
+ [
+ pa.array(
+ [
+ datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
+ datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
+ ],
+ type=pa.timestamp("us", tz="UTC"),
+ )
+ ],
+ names=["ts_field"],
+ )
+
+ # table schema expects timestamp without timezone
+ table_schema = Schema(NestedField(1, "ts_field", TimestampType(),
required=False))
+
+ # allow_timestamp_tz_mismatch=False (default, used in write path) should
raise
+ with pytest.raises(ValueError, match="Unsupported schema projection"):
+ _to_requested_schema(
+ table_schema, file_schema, batch,
downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=False
+ )
+
+
+def test_write_file_rejects_timestamptz_to_timestamp(tmp_path: Path) -> None:
+ """Test that write_file rejects writing timestamptz data to a timestamp
column."""
+ from pyiceberg.table import WriteTask
+
+ # Table expects timestamp (no tz), but data has timestamptz
+ table_schema = Schema(NestedField(1, "ts_field", TimestampType(),
required=False))
+ task_schema = Schema(NestedField(1, "ts_field", TimestamptzType(),
required=False))
+
+ arrow_data = pa.table({"ts_field": [datetime(2025, 8, 14, 12, 0, 0,
tzinfo=timezone.utc)]})
+
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}",
+ last_column_id=1,
+ format_version=2,
+ schemas=[table_schema],
+ partition_specs=[PartitionSpec()],
+ )
+
+ task = WriteTask(
+ write_uuid=uuid.uuid4(),
+ task_id=0,
+ record_batches=arrow_data.to_batches(),
+ schema=task_schema,
+ )
+
+ with pytest.raises(ValueError, match="Unsupported schema projection"):
+ list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata,
tasks=iter([task])))
+
+
def test__to_requested_schema_timestamps(
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
arrow_table_with_all_timestamp_precisions: pa.Table,