This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0d1589a63e [python] Fix read crash after a column type change on
non-partitioned tables (#8073)
0d1589a63e is described below
commit 0d1589a63efa7b34eb6d8336ca13fddfef97a1dd
Author: Junrui Lee <[email protected]>
AuthorDate: Tue Jun 2 23:15:17 2026 +0800
[python] Fix read crash after a column type change on non-partitioned
tables (#8073)
Reading a non-partitioned table after changing an existing column's type
crashes with an Arrow schema mismatch:
```
ArrowInvalid: Schema at index 1 was different:
v: decimal128(10, 2) (file written before the type change)
vs
v: decimal128(20, 2) (file written after)
```
When a table has no partition keys and the read needs no column
reordering,
`DataFileBatchReader` returns the format reader's batch as-is, so
columns from
older-schema files keep their original physical types. The output type
then
depends on whether the read happens to span newer-schema files, and
concatenation fails when it does.
---
.../pypaimon/read/reader/data_file_batch_reader.py | 49 +++++--
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 7 +-
paimon-python/pypaimon/tests/reader_base_test.py | 7 +-
.../pypaimon/tests/schema_evolution_read_test.py | 151 +++++++++++++++++++++
4 files changed, 202 insertions(+), 12 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 64da0cc840..12e6990e13 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -78,6 +78,16 @@ class DataFileBatchReader(RecordBatchReader):
return None
if self.partition_info is None and self.index_mapping is None:
+ # A file written under an older schema (e.g. before an INT ->
BIGINT
+ # promotion or a DECIMAL precision change) yields columns in the
+ # data file's original types. Without reordering or partition
padding
+ # to rebuild the batch, those old types would otherwise leak
through
+ # here -- returning a type that depends on whether this read
happens
+ # to span newer-schema files, and failing to concatenate when it
+ # does. Align them to the current read schema, mirroring the
rebuild
+ # path below.
+ record_batch = self._align_batch_to_read_schema(
+ record_batch.schema.names, record_batch.columns)
if self.row_tracking_enabled and self.system_fields:
record_batch = self._assign_row_tracking(record_batch)
return record_batch
@@ -122,16 +132,9 @@ class DataFileBatchReader(RecordBatchReader):
inter_arrays = mapped_arrays
inter_names = mapped_names
- # to contains 'not null' property
- final_fields = []
- for i, name in enumerate(inter_names):
- array = inter_arrays[i]
- target_field = self.schema_map.get(name)
- if not target_field:
- target_field = pa.field(name, array.type)
- final_fields.append(target_field)
- final_schema = pa.schema(final_fields)
- record_batch = pa.RecordBatch.from_arrays(inter_arrays,
schema=final_schema)
+ # Rebuild the batch typed by the read schema (carries 'not null' and
+ # aligns old-schema column types).
+ record_batch = self._align_batch_to_read_schema(inter_names,
inter_arrays)
# Handle row tracking fields
if self.row_tracking_enabled and self.system_fields:
@@ -141,6 +144,32 @@ class DataFileBatchReader(RecordBatchReader):
return record_batch
+ def _align_batch_to_read_schema(self, names: List[str], arrays: list) ->
RecordBatch:
+ """Build a record batch for ``names``/``arrays`` typed by the read
schema.
+
+ Each known field is cast to the current read schema's type, which also
+ carries the 'not null' property; unknown columns keep the array's own
+ type. Columns whose type already matches are reused as-is, keeping the
+ common (non-evolution) path zero-copy.
+
+ Casts use ``safe=False`` to match Java ``CastExecutors`` semantics for
+ the read-time conversions a user-approved schema evolution implies
+ (e.g. DECIMAL scale-down or DOUBLE -> INT truncate rather than raise).
+ Evolution legality is the writer's concern (``DataTypeCasts``); the
read
+ path only materializes the result.
+ """
+ out_arrays = []
+ out_fields = []
+ for name, array in zip(names, arrays):
+ target_field = self.schema_map.get(name)
+ if target_field is None:
+ target_field = pa.field(name, array.type)
+ elif array.type != target_field.type:
+ array = array.cast(target_field.type, safe=False)
+ out_arrays.append(array)
+ out_fields.append(target_field)
+ return pa.RecordBatch.from_arrays(out_arrays,
schema=pa.schema(out_fields))
+
def _convert_descriptor_stored_blob_columns(self, record_batch:
RecordBatch) -> RecordBatch:
if isinstance(self.format_reader, FormatBlobReader):
return record_batch
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index a0a3ad37e8..a5aa88d7ca 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -241,7 +241,12 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_data = table_read.to_arrow(table_scan.plan().splits())
- self.assertEqual(actual_data, expect_data)
+ # BINARY(N) maps to variable-length binary on read (see #7518), so the
+ # fixed-size f9 column normalizes to binary; reflect that in the
expected.
+ f9_index = expect_data.schema.get_field_index('f9')
+ expected_data = expect_data.set_column(
+ f9_index, 'f9', expect_data.column('f9').cast(pa.binary()))
+ self.assertEqual(actual_data, expected_data)
# to test GenericRow ability
latest_snapshot = table.snapshot_manager().get_latest_snapshot()
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index 31d8205f88..12875cabaf 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -286,7 +286,12 @@ class ReaderBasicTest(unittest.TestCase):
# assert equal
actual_data = table_read.to_arrow(splits)
- self.assertEqual(actual_data, expect_data)
+ # BINARY(N) maps to variable-length binary on read (see #7518), so the
+ # fixed-size f9 column normalizes to binary; reflect that in the
expected.
+ f9_index = expect_data.schema.get_field_index('f9')
+ expected_data = expect_data.set_column(
+ f9_index, 'f9', expect_data.column('f9').cast(pa.binary()))
+ self.assertEqual(actual_data, expected_data)
# to test GenericRow ability
latest_snapshot = table.snapshot_manager().get_latest_snapshot()
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index a50b8ac44b..0bc5c9f142 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -25,6 +25,8 @@ import pyarrow as pa
from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.data_types import AtomicType
+from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.table_schema import TableSchema
@@ -202,6 +204,155 @@ class SchemaEvolutionReadTest(unittest.TestCase):
}, schema=pa_schema)
self.assertEqual(expected, actual)
+ def test_schema_evolution_type_promotion_unpartitioned(self):
+ # End-to-end via public API only (create -> write -> alter column type
+ # -> write -> read). A non-partitioned table whose read needs no column
+ # reordering takes the reader fast path that skips partition padding
and
+ # index remapping. The file written before the type change keeps its
old
+ # physical type, so it must be aligned to the promoted type to
+ # concatenate with the file written after; otherwise the read crashes
+ # with an Arrow schema mismatch. This is not specific to INT -> BIGINT:
+ # it applies to any type change of an existing column -- integer/float
+ # widening, DECIMAL precision/scale changes, and cross-type changes.
+ import decimal
+
+ # Each case: (name, old arrow type, new arrow type, new Paimon type,
+ # value written to the old-schema file, that same value as it should
+ # read back under the new type, value written to the new-schema file).
+ # The old write value and its expected read form differ for cross-type
+ # changes, where the old file is materialized under the new type.
+ cases = [
+ ("smallint_to_int", pa.int16(), pa.int32(), 'INT',
+ [10, 20], [10, 20], [30, 40]),
+ ("int_to_bigint", pa.int32(), pa.int64(), 'BIGINT',
+ [10, 20], [10, 20], [30, 40]),
+ ("float_to_double", pa.float32(), pa.float64(), 'DOUBLE',
+ [1.5, 2.5], [1.5, 2.5], [3.5, 4.5]),
+ ("decimal_precision_up",
+ pa.decimal128(10, 2), pa.decimal128(20, 2), 'DECIMAL(20, 2)',
+ [decimal.Decimal('1.23'), decimal.Decimal('4.56')],
+ [decimal.Decimal('1.23'), decimal.Decimal('4.56')],
+ [decimal.Decimal('7.89'), decimal.Decimal('0.12')]),
+ ("decimal_scale_up",
+ pa.decimal128(10, 2), pa.decimal128(10, 4), 'DECIMAL(10, 4)',
+ [decimal.Decimal('1.23'), decimal.Decimal('4.56')],
+ [decimal.Decimal('1.2300'), decimal.Decimal('4.5600')],
+ [decimal.Decimal('7.8901'), decimal.Decimal('0.1234')]),
+ ("int_to_string", pa.int32(), pa.string(), 'STRING',
+ [10, 20], ['10', '20'], ['a', 'b']),
+ # Lossy cross-type change: DOUBLE -> INT truncates (matches Java
+ # CastExecutors), so 1.2/2.8 read back as 1/2.
+ ("double_to_int", pa.float64(), pa.int32(), 'INT',
+ [1.2, 2.8], [1, 2], [3, 4]),
+ # Lossy DECIMAL scale-down: (10,4) -> (10,2) truncates the extra
+ # scale rather than raising.
+ ("decimal_scale_down",
+ pa.decimal128(10, 4), pa.decimal128(10, 2), 'DECIMAL(10, 2)',
+ [decimal.Decimal('1.2345'), decimal.Decimal('4.5678')],
+ [decimal.Decimal('1.23'), decimal.Decimal('4.56')],
+ [decimal.Decimal('7.89'), decimal.Decimal('0.12')]),
+ ]
+
+ for (name, old_type, new_type, new_type_str,
+ write_vals, old_read_vals, new_vals) in cases:
+ with self.subTest(case=name):
+ table_name = f'default.promo_{name}'
+ old_schema = pa.schema([('k', pa.int64()), ('v', old_type)])
+ self.catalog.create_table(
+ table_name, Schema.from_pyarrow_schema(old_schema), False)
+
+ # Write under the original schema (file stamped schema_id 0).
+ table = self.catalog.get_table(table_name)
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(pa.Table.from_pydict(
+ {'k': [1, 2], 'v': write_vals}, schema=old_schema))
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Widen column v through the catalog (new schema_id 1).
+ self.catalog.alter_table(
+ table_name,
+ [SchemaChange.update_column_type(
+ 'v', AtomicType(new_type_str))],
+ False)
+
+ # Write under the promoted schema (file stamped schema_id 1).
+ table = self.catalog.get_table(table_name)
+ new_schema = pa.schema([('k', pa.int64()), ('v', new_type)])
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(pa.Table.from_pydict(
+ {'k': [3, 4], 'v': new_vals}, schema=new_schema))
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Plain full-table read spanning both schema versions.
+ read_builder = table.new_read_builder()
+ actual = read_builder.new_read().to_arrow(
+ self._scan_table(read_builder))
+ expected = pa.Table.from_pydict(
+ {'k': [1, 2, 3, 4], 'v': old_read_vals + new_vals},
+ schema=new_schema)
+ self.assertEqual(expected, actual)
+
+ def test_schema_evolution_type_lossy_old_file_only(self):
+ # Reading ONLY old-schema files after a lossy type change (no
+ # newer-schema file in the splits). The output type must equal the
+ # current read schema regardless of which files the read spans, and the
+ # conversion must truncate to match Java CastExecutors rather than
+ # raise. (A previous fix that relied on pyarrow's safe cast crashed
+ # here on lossy evolutions.)
+ import decimal
+
+ cases = [
+ ("scale_down",
+ pa.decimal128(10, 4), pa.decimal128(10, 2), 'DECIMAL(10, 2)',
+ [decimal.Decimal('1.2345'), decimal.Decimal('4.5678')],
+ [decimal.Decimal('1.23'), decimal.Decimal('4.56')]),
+ ("double_to_int", pa.float64(), pa.int32(), 'INT',
+ [1.2, 2.8], [1, 2]),
+ ]
+
+ for name, old_type, new_type, new_type_str, write_vals, read_vals \
+ in cases:
+ with self.subTest(case=name):
+ table_name = f'default.lossy_old_only_{name}'
+ old_schema = pa.schema([('k', pa.int64()), ('v', old_type)])
+ self.catalog.create_table(
+ table_name, Schema.from_pyarrow_schema(old_schema), False)
+
+ # Write under the original schema, then change the type. No
+ # write happens afterwards, so the read sees only this file.
+ table = self.catalog.get_table(table_name)
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(pa.Table.from_pydict(
+ {'k': [1, 2], 'v': write_vals}, schema=old_schema))
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ self.catalog.alter_table(
+ table_name,
+ [SchemaChange.update_column_type(
+ 'v', AtomicType(new_type_str))],
+ False)
+
+ table = self.catalog.get_table(table_name)
+ new_schema = pa.schema([('k', pa.int64()), ('v', new_type)])
+ read_builder = table.new_read_builder()
+ actual = read_builder.new_read().to_arrow(
+ self._scan_table(read_builder))
+ expected = pa.Table.from_pydict(
+ {'k': [1, 2], 'v': read_vals}, schema=new_schema)
+ self.assertEqual(expected, actual)
+
def test_schema_evolution_with_scan_filter(self):
# schema 0
pa_schema = pa.schema([