This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d1589a63e [python] Fix read crash after a column type change on 
non-partitioned tables (#8073)
0d1589a63e is described below

commit 0d1589a63efa7b34eb6d8336ca13fddfef97a1dd
Author: Junrui Lee <[email protected]>
AuthorDate: Tue Jun 2 23:15:17 2026 +0800

    [python] Fix read crash after a column type change on non-partitioned 
tables (#8073)
    
    Reading a non-partitioned table after changing an existing column's type
    crashes with an Arrow schema mismatch:
    
    ```
    ArrowInvalid: Schema at index 1 was different:
    v: decimal128(10, 2)   (file written before the type change)
    vs
    v: decimal128(20, 2)   (file written after)
    ```
    
    When a table has no partition keys and the read needs no column
    reordering,
    `DataFileBatchReader` returns the format reader's batch as-is, so
    columns from
    older-schema files keep their original physical types. The output type
    then
    depends on whether the read happens to span newer-schema files, and
    concatenation fails when it does.
---
 .../pypaimon/read/reader/data_file_batch_reader.py |  49 +++++--
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |   7 +-
 paimon-python/pypaimon/tests/reader_base_test.py   |   7 +-
 .../pypaimon/tests/schema_evolution_read_test.py   | 151 +++++++++++++++++++++
 4 files changed, 202 insertions(+), 12 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 64da0cc840..12e6990e13 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -78,6 +78,16 @@ class DataFileBatchReader(RecordBatchReader):
             return None
 
         if self.partition_info is None and self.index_mapping is None:
+            # A file written under an older schema (e.g. before an INT -> 
BIGINT
+            # promotion or a DECIMAL precision change) yields columns in the
+            # data file's original types. Without reordering or partition 
padding
+            # to rebuild the batch, those old types would otherwise leak 
through
+            # here -- returning a type that depends on whether this read 
happens
+            # to span newer-schema files, and failing to concatenate when it
+            # does. Align them to the current read schema, mirroring the 
rebuild
+            # path below.
+            record_batch = self._align_batch_to_read_schema(
+                record_batch.schema.names, record_batch.columns)
             if self.row_tracking_enabled and self.system_fields:
                 record_batch = self._assign_row_tracking(record_batch)
             return record_batch
@@ -122,16 +132,9 @@ class DataFileBatchReader(RecordBatchReader):
             inter_arrays = mapped_arrays
             inter_names = mapped_names
 
-        # to contains 'not null' property
-        final_fields = []
-        for i, name in enumerate(inter_names):
-            array = inter_arrays[i]
-            target_field = self.schema_map.get(name)
-            if not target_field:
-                target_field = pa.field(name, array.type)
-            final_fields.append(target_field)
-        final_schema = pa.schema(final_fields)
-        record_batch = pa.RecordBatch.from_arrays(inter_arrays, 
schema=final_schema)
+        # Rebuild the batch typed by the read schema (carries 'not null' and
+        # aligns old-schema column types).
+        record_batch = self._align_batch_to_read_schema(inter_names, 
inter_arrays)
 
         # Handle row tracking fields
         if self.row_tracking_enabled and self.system_fields:
@@ -141,6 +144,32 @@ class DataFileBatchReader(RecordBatchReader):
 
         return record_batch
 
+    def _align_batch_to_read_schema(self, names: List[str], arrays: list) -> 
RecordBatch:
+        """Build a record batch for ``names``/``arrays`` typed by the read 
schema.
+
+        Each known field is cast to the current read schema's type, which also
+        carries the 'not null' property; unknown columns keep the array's own
+        type. Columns whose type already matches are reused as-is, keeping the
+        common (non-evolution) path zero-copy.
+
+        Casts use ``safe=False`` to match Java ``CastExecutors`` semantics for
+        the read-time conversions a user-approved schema evolution implies
+        (e.g. DECIMAL scale-down or DOUBLE -> INT truncate rather than raise).
+        Evolution legality is the writer's concern (``DataTypeCasts``); the 
read
+        path only materializes the result.
+        """
+        out_arrays = []
+        out_fields = []
+        for name, array in zip(names, arrays):
+            target_field = self.schema_map.get(name)
+            if target_field is None:
+                target_field = pa.field(name, array.type)
+            elif array.type != target_field.type:
+                array = array.cast(target_field.type, safe=False)
+            out_arrays.append(array)
+            out_fields.append(target_field)
+        return pa.RecordBatch.from_arrays(out_arrays, 
schema=pa.schema(out_fields))
+
     def _convert_descriptor_stored_blob_columns(self, record_batch: 
RecordBatch) -> RecordBatch:
         if isinstance(self.format_reader, FormatBlobReader):
             return record_batch
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index a0a3ad37e8..a5aa88d7ca 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -241,7 +241,12 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         table_scan = read_builder.new_scan()
         table_read = read_builder.new_read()
         actual_data = table_read.to_arrow(table_scan.plan().splits())
-        self.assertEqual(actual_data, expect_data)
+        # BINARY(N) maps to variable-length binary on read (see #7518), so the
+        # fixed-size f9 column normalizes to binary; reflect that in the 
expected.
+        f9_index = expect_data.schema.get_field_index('f9')
+        expected_data = expect_data.set_column(
+            f9_index, 'f9', expect_data.column('f9').cast(pa.binary()))
+        self.assertEqual(actual_data, expected_data)
 
         # to test GenericRow ability
         latest_snapshot = table.snapshot_manager().get_latest_snapshot()
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index 31d8205f88..12875cabaf 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -286,7 +286,12 @@ class ReaderBasicTest(unittest.TestCase):
 
         # assert equal
         actual_data = table_read.to_arrow(splits)
-        self.assertEqual(actual_data, expect_data)
+        # BINARY(N) maps to variable-length binary on read (see #7518), so the
+        # fixed-size f9 column normalizes to binary; reflect that in the 
expected.
+        f9_index = expect_data.schema.get_field_index('f9')
+        expected_data = expect_data.set_column(
+            f9_index, 'f9', expect_data.column('f9').cast(pa.binary()))
+        self.assertEqual(actual_data, expected_data)
 
         # to test GenericRow ability
         latest_snapshot = table.snapshot_manager().get_latest_snapshot()
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py 
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index a50b8ac44b..0bc5c9f142 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -25,6 +25,8 @@ import pyarrow as pa
 
 from pypaimon import CatalogFactory, Schema
 
+from pypaimon.schema.data_types import AtomicType
+from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.schema.table_schema import TableSchema
 
@@ -202,6 +204,155 @@ class SchemaEvolutionReadTest(unittest.TestCase):
         }, schema=pa_schema)
         self.assertEqual(expected, actual)
 
+    def test_schema_evolution_type_promotion_unpartitioned(self):
+        # End-to-end via public API only (create -> write -> alter column type
+        # -> write -> read). A non-partitioned table whose read needs no column
+        # reordering takes the reader fast path that skips partition padding 
and
+        # index remapping. The file written before the type change keeps its 
old
+        # physical type, so it must be aligned to the promoted type to
+        # concatenate with the file written after; otherwise the read crashes
+        # with an Arrow schema mismatch. This is not specific to INT -> BIGINT:
+        # it applies to any type change of an existing column -- integer/float
+        # widening, DECIMAL precision/scale changes, and cross-type changes.
+        import decimal
+
+        # Each case: (name, old arrow type, new arrow type, new Paimon type,
+        # value written to the old-schema file, that same value as it should
+        # read back under the new type, value written to the new-schema file).
+        # The old write value and its expected read form differ for cross-type
+        # changes, where the old file is materialized under the new type.
+        cases = [
+            ("smallint_to_int", pa.int16(), pa.int32(), 'INT',
+             [10, 20], [10, 20], [30, 40]),
+            ("int_to_bigint", pa.int32(), pa.int64(), 'BIGINT',
+             [10, 20], [10, 20], [30, 40]),
+            ("float_to_double", pa.float32(), pa.float64(), 'DOUBLE',
+             [1.5, 2.5], [1.5, 2.5], [3.5, 4.5]),
+            ("decimal_precision_up",
+             pa.decimal128(10, 2), pa.decimal128(20, 2), 'DECIMAL(20, 2)',
+             [decimal.Decimal('1.23'), decimal.Decimal('4.56')],
+             [decimal.Decimal('1.23'), decimal.Decimal('4.56')],
+             [decimal.Decimal('7.89'), decimal.Decimal('0.12')]),
+            ("decimal_scale_up",
+             pa.decimal128(10, 2), pa.decimal128(10, 4), 'DECIMAL(10, 4)',
+             [decimal.Decimal('1.23'), decimal.Decimal('4.56')],
+             [decimal.Decimal('1.2300'), decimal.Decimal('4.5600')],
+             [decimal.Decimal('7.8901'), decimal.Decimal('0.1234')]),
+            ("int_to_string", pa.int32(), pa.string(), 'STRING',
+             [10, 20], ['10', '20'], ['a', 'b']),
+            # Lossy cross-type change: DOUBLE -> INT truncates (matches Java
+            # CastExecutors), so 1.2/2.8 read back as 1/2.
+            ("double_to_int", pa.float64(), pa.int32(), 'INT',
+             [1.2, 2.8], [1, 2], [3, 4]),
+            # Lossy DECIMAL scale-down: (10,4) -> (10,2) truncates the extra
+            # scale rather than raising.
+            ("decimal_scale_down",
+             pa.decimal128(10, 4), pa.decimal128(10, 2), 'DECIMAL(10, 2)',
+             [decimal.Decimal('1.2345'), decimal.Decimal('4.5678')],
+             [decimal.Decimal('1.23'), decimal.Decimal('4.56')],
+             [decimal.Decimal('7.89'), decimal.Decimal('0.12')]),
+        ]
+
+        for (name, old_type, new_type, new_type_str,
+             write_vals, old_read_vals, new_vals) in cases:
+            with self.subTest(case=name):
+                table_name = f'default.promo_{name}'
+                old_schema = pa.schema([('k', pa.int64()), ('v', old_type)])
+                self.catalog.create_table(
+                    table_name, Schema.from_pyarrow_schema(old_schema), False)
+
+                # Write under the original schema (file stamped schema_id 0).
+                table = self.catalog.get_table(table_name)
+                write_builder = table.new_batch_write_builder()
+                table_write = write_builder.new_write()
+                table_commit = write_builder.new_commit()
+                table_write.write_arrow(pa.Table.from_pydict(
+                    {'k': [1, 2], 'v': write_vals}, schema=old_schema))
+                table_commit.commit(table_write.prepare_commit())
+                table_write.close()
+                table_commit.close()
+
+                # Widen column v through the catalog (new schema_id 1).
+                self.catalog.alter_table(
+                    table_name,
+                    [SchemaChange.update_column_type(
+                        'v', AtomicType(new_type_str))],
+                    False)
+
+                # Write under the promoted schema (file stamped schema_id 1).
+                table = self.catalog.get_table(table_name)
+                new_schema = pa.schema([('k', pa.int64()), ('v', new_type)])
+                write_builder = table.new_batch_write_builder()
+                table_write = write_builder.new_write()
+                table_commit = write_builder.new_commit()
+                table_write.write_arrow(pa.Table.from_pydict(
+                    {'k': [3, 4], 'v': new_vals}, schema=new_schema))
+                table_commit.commit(table_write.prepare_commit())
+                table_write.close()
+                table_commit.close()
+
+                # Plain full-table read spanning both schema versions.
+                read_builder = table.new_read_builder()
+                actual = read_builder.new_read().to_arrow(
+                    self._scan_table(read_builder))
+                expected = pa.Table.from_pydict(
+                    {'k': [1, 2, 3, 4], 'v': old_read_vals + new_vals},
+                    schema=new_schema)
+                self.assertEqual(expected, actual)
+
+    def test_schema_evolution_type_lossy_old_file_only(self):
+        # Reading ONLY old-schema files after a lossy type change (no
+        # newer-schema file in the splits). The output type must equal the
+        # current read schema regardless of which files the read spans, and the
+        # conversion must truncate to match Java CastExecutors rather than
+        # raise. (A previous fix that relied on pyarrow's safe cast crashed
+        # here on lossy evolutions.)
+        import decimal
+
+        cases = [
+            ("scale_down",
+             pa.decimal128(10, 4), pa.decimal128(10, 2), 'DECIMAL(10, 2)',
+             [decimal.Decimal('1.2345'), decimal.Decimal('4.5678')],
+             [decimal.Decimal('1.23'), decimal.Decimal('4.56')]),
+            ("double_to_int", pa.float64(), pa.int32(), 'INT',
+             [1.2, 2.8], [1, 2]),
+        ]
+
+        for name, old_type, new_type, new_type_str, write_vals, read_vals \
+                in cases:
+            with self.subTest(case=name):
+                table_name = f'default.lossy_old_only_{name}'
+                old_schema = pa.schema([('k', pa.int64()), ('v', old_type)])
+                self.catalog.create_table(
+                    table_name, Schema.from_pyarrow_schema(old_schema), False)
+
+                # Write under the original schema, then change the type. No
+                # write happens afterwards, so the read sees only this file.
+                table = self.catalog.get_table(table_name)
+                write_builder = table.new_batch_write_builder()
+                table_write = write_builder.new_write()
+                table_commit = write_builder.new_commit()
+                table_write.write_arrow(pa.Table.from_pydict(
+                    {'k': [1, 2], 'v': write_vals}, schema=old_schema))
+                table_commit.commit(table_write.prepare_commit())
+                table_write.close()
+                table_commit.close()
+
+                self.catalog.alter_table(
+                    table_name,
+                    [SchemaChange.update_column_type(
+                        'v', AtomicType(new_type_str))],
+                    False)
+
+                table = self.catalog.get_table(table_name)
+                new_schema = pa.schema([('k', pa.int64()), ('v', new_type)])
+                read_builder = table.new_read_builder()
+                actual = read_builder.new_read().to_arrow(
+                    self._scan_table(read_builder))
+                expected = pa.Table.from_pydict(
+                    {'k': [1, 2], 'v': read_vals}, schema=new_schema)
+                self.assertEqual(expected, actual)
+
     def test_schema_evolution_with_scan_filter(self):
         # schema 0
         pa_schema = pa.schema([

Reply via email to