This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cf5db9970a [python] Align read-time schema evolution by field id 
(#8126)
cf5db9970a is described below

commit cf5db9970aa084302be8be4155be694f54979bc0
Author: chaoyang <[email protected]>
AuthorDate: Sat Jun 6 16:57:03 2026 +0800

    [python] Align read-time schema evolution by field id (#8126)
    
    Read-time schema evolution matched a data file's columns to the read
    schema by **name** rather than by **field id**. So after a column rename
    the old file's column became unreachable and its rows read `NULL`, and
    dropping a column then re-adding the same name revived the dropped
    column's stale data.
    
    Align per file by field id instead: read each file under its own field
    names/types and normalize to the latest read schema by id — a rename
    follows the id, and a re-added name gets a fresh id and pads `NULL`.
    Non-evolving reads stay zero-copy.
    
    Adds regression tests for rename / drop-then-readd / swap across
    append-only, primary-key (merge engines) and nested top-level columns.
---
 .../pypaimon/read/reader/data_file_batch_reader.py |  56 +++-
 paimon-python/pypaimon/read/split_read.py          | 101 +++++--
 .../tests/schema_evolution_nested_read_test.py     | 317 +++++++++++++++++++
 .../tests/schema_evolution_pk_read_test.py         | 335 +++++++++++++++++++++
 .../pypaimon/tests/schema_evolution_read_test.py   | 289 +++++++++++++++++-
 5 files changed, 1073 insertions(+), 25 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 21d1b2a911..e3bd511439 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -40,7 +40,9 @@ class DataFileBatchReader(RecordBatchReader):
                  row_tracking_enabled: bool,
                  system_fields: dict,
                  file_io: Optional[FileIO] = None,
-                 row_id_offsets: Optional[List[int]] = None):
+                 row_id_offsets: Optional[List[int]] = None,
+                 file_data_fields: Optional[List[DataField]] = None,
+                 target_data_fields: Optional[List[DataField]] = None):
         self.format_reader = format_reader
         self.index_mapping = index_mapping
         self.partition_info = partition_info
@@ -53,6 +55,57 @@ class DataFileBatchReader(RecordBatchReader):
         self.max_sequence_number = max_sequence_number
         self.system_fields = system_fields
         self.file_io = file_io
+        # Per-file field-id normalization: map the physically-read columns
+        # (the file's own field order/names) onto the latest read target by
+        # field id, padding missing ids with NULL. ``None`` when there is no
+        # evolution to reconcile (identity) -- the common path stays zero-copy.
+        self._normalize_positions, self._normalize_names = \
+            self._build_normalize_plan(file_data_fields, target_data_fields)
+
+    @staticmethod
+    def _build_normalize_plan(file_data_fields, target_data_fields):
+        """Build a per-file field-id alignment plan.
+
+        Returns ``(positions, names)`` where ``positions[i]`` is the column
+        index in the physically-read batch carrying ``target_data_fields[i]``
+        (matched by field id), or -1 if the file does not contain that id (pad
+        NULL). ``names[i]`` is the latest target name. Returns ``(None, None)``
+        when the plan is the identity (no evolution), so the caller skips
+        normalization and stays zero-copy.
+        """
+        if file_data_fields is None or target_data_fields is None:
+            return None, None
+        file_id_to_pos = {f.id: i for i, f in enumerate(file_data_fields)}
+        positions = []
+        names = []
+        # Identity only when every target maps to the same physical position
+        # AND already carries the same name -- a rename keeps the position but
+        # changes the name, which still requires a relabel pass.
+        identity = len(file_data_fields) == len(target_data_fields)
+        for i, target in enumerate(target_data_fields):
+            pos = file_id_to_pos.get(target.id, -1)
+            positions.append(pos)
+            names.append(target.name)
+            if pos != i or (pos >= 0 and file_data_fields[pos].name != 
target.name):
+                identity = False
+        if identity:
+            return None, None
+        return positions, names
+
+    def _normalize_batch(self, record_batch: RecordBatch) -> RecordBatch:
+        """Reorder/pad the physically-read batch onto the latest read target by
+        field id, and relabel columns to the latest names. Missing ids become
+        all-NULL columns; types are reconciled later by 
_align_batch_to_read_schema."""
+        if self._normalize_positions is None:
+            return record_batch
+        num_rows = record_batch.num_rows
+        arrays = []
+        for pos in self._normalize_positions:
+            if pos < 0:
+                arrays.append(pa.nulls(num_rows))
+            else:
+                arrays.append(record_batch.column(pos))
+        return pa.RecordBatch.from_arrays(arrays, names=self._normalize_names)
 
     def read_arrow_batch(self, start_idx=None, end_idx=None) -> 
Optional[RecordBatch]:
         if isinstance(self.format_reader, FormatBlobReader):
@@ -61,6 +114,7 @@ class DataFileBatchReader(RecordBatchReader):
             record_batch = self.format_reader.read_arrow_batch()
         if record_batch is None:
             return None
+        record_batch = self._normalize_batch(record_batch)
 
         if self.partition_info is None and self.index_mapping is None:
             # A file written under an older schema (e.g. before an INT -> 
BIGINT
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index fe76527431..543c858935 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -217,14 +217,27 @@ class SplitRead(ABC):
         nested_path_by_name = self._nested_path_by_name()
         has_nested = nested_path_by_name is not None
 
-        # Cover both the merge-internal aliases (``_KEY_id``) and the
-        # bare user-facing PK name (``id``) the file actually stores.
-        name_to_field: Dict[str, DataField] = {f.name: f for f in 
self.read_fields}
-        _, _trimmed_lookup_fields = self._get_trimmed_fields(
-            self._get_read_data_fields(), self._get_all_data_fields()
-        )
-        for f in _trimmed_lookup_fields:
-            name_to_field.setdefault(f.name, f)
+        # Field-id based per-file read (non-nested): select the file's OWN
+        # physical fields by field id and read them under the file's original
+        # names/types. A normalize step in DataFileBatchReader then aligns the
+        # batch to the latest read schema by field id (not by name), so a
+        # rename follows the id and a dropped-then-readded name cannot revive
+        # stale data. Nested-projection reads stay on the legacy name path.
+        file_read_fields = None if has_nested else self._file_read_fields(file)
+        target_fields = None if has_nested else self._target_read_fields()
+        if file_read_fields is not None:
+            read_file_fields = [f.name for f in file_read_fields]
+            name_to_field: Dict[str, DataField] = {
+                f.name: f for f in file_read_fields}
+        else:
+            # Cover both the merge-internal aliases (``_KEY_id``) and the
+            # bare user-facing PK name (``id``) the file actually stores.
+            name_to_field = {f.name: f for f in self.read_fields}
+            _, _trimmed_lookup_fields = self._get_trimmed_fields(
+                self._get_read_data_fields(), self._get_all_data_fields()
+            )
+            for f in _trimmed_lookup_fields:
+                name_to_field.setdefault(f.name, f)
 
         format_reader: RecordBatchReader
         if file_format == CoreOptions.FILE_FORMAT_AVRO:
@@ -342,7 +355,9 @@ class SplitRead(ABC):
                 row_tracking_enabled,
                 system_fields,
                 file_io=self.table.file_io,
-                row_id_offsets=row_indices)
+                row_id_offsets=row_indices,
+                file_data_fields=file_read_fields,
+                target_data_fields=target_fields)
         else:
             reader = DataFileBatchReader(
                 format_reader,
@@ -355,7 +370,9 @@ class SplitRead(ABC):
                 row_tracking_enabled,
                 system_fields,
                 file_io=self.table.file_io,
-                row_id_offsets=row_indices)
+                row_id_offsets=row_indices,
+                file_data_fields=file_read_fields,
+                target_data_fields=target_fields)
 
         # For non-Vortex formats, wrap with RowIdFilterRecordBatchReader
         if row_ranges is not None and row_indices is None:
@@ -401,16 +418,54 @@ class SplitRead(ABC):
         return self.schema_id_2_fields[key]
 
     @abstractmethod
+    def _all_data_fields_from(self, fields: List[DataField]) -> 
List[DataField]:
+        """Apply this split-read's data-field shaping (row-tracking / kv
+        wrapping) to the given base ``fields``. Called both for the latest
+        table schema and for an older file schema."""
+
     def _get_all_data_fields(self):
-        """Get all data fields"""
+        return self._all_data_fields_from(self.table.fields)
 
     def _get_read_data_fields(self):
-        read_data_fields = []
+        return self._read_data_fields_from(self._get_all_data_fields())
+
+    def _read_data_fields_from(self, all_data_fields):
         read_field_ids = {field.id for field in self.read_fields}
-        for data_field in self._get_all_data_fields():
-            if data_field.id in read_field_ids:
-                read_data_fields.append(data_field)
-        return read_data_fields
+        return [f for f in all_data_fields if f.id in read_field_ids]
+
+    def _final_data_fields_from(self, all_data_fields: List[DataField]) -> 
List[DataField]:
+        """The per-position target fields a batch must end up as: trimmed for
+        kv (``_KEY_*``) duplicates and stripped of partition columns. The
+        DataField analogue of ``_get_final_read_data_fields()``. Called with
+        the latest-schema fields it yields the read target (latest names +
+        types); called with a file's fields it yields what to physically read
+        from that file (the file's own names + types)."""
+        _, trimmed = self._get_trimmed_fields(
+            self._read_data_fields_from(all_data_fields), all_data_fields)
+        partition_keys = self.table.partition_keys
+        if not partition_keys:
+            return list(trimmed)
+        return [f for f in trimmed if f.name not in partition_keys]
+
+    def _target_read_fields(self) -> Optional[List[DataField]]:
+        """Latest-schema target fields (names + types) that a normalized batch
+        must align to, in order. None for nested-projection reads (kept on the
+        legacy name-based path)."""
+        if self._nested_path_by_name() is not None:
+            return None
+        return self._final_data_fields_from(self._get_all_data_fields())
+
+    def _file_read_fields(self, file: DataFileMeta) -> 
Optional[List[DataField]]:
+        """The fields to physically read from ``file``, in the file's own
+        names/types, selected by field id against the read set. None for
+        nested-projection reads."""
+        if self._nested_path_by_name() is not None:
+            return None
+        file_schema = self.table.schema_manager.get_schema(file.schema_id)
+        if file_schema is None:
+            return None
+        return self._final_data_fields_from(
+            self._all_data_fields_from(file_schema.fields))
 
     def _create_key_value_fields(self, value_field: List[DataField]):
         all_fields: List[DataField] = self.table.fields
@@ -629,10 +684,10 @@ class RawFileSplitRead(SplitRead):
                 reader = LimitedRecordBatchReader(reader, self.limit)
         return reader
 
-    def _get_all_data_fields(self):
+    def _all_data_fields_from(self, fields):
         if self.row_tracking_enabled:
-            return SpecialFields.row_type_with_row_tracking(self.table.fields)
-        return self.table.fields
+            return SpecialFields.row_type_with_row_tracking(fields)
+        return fields
 
 
 class MergeFileSplitRead(SplitRead):
@@ -760,8 +815,8 @@ class MergeFileSplitRead(SplitRead):
             reader = LimitedRecordReader(reader, self.limit)
         return reader
 
-    def _get_all_data_fields(self):
-        return self._create_key_value_fields(self.table.fields)
+    def _all_data_fields_from(self, fields):
+        return self._create_key_value_fields(fields)
 
 
 class DataEvolutionSplitRead(SplitRead):
@@ -1114,5 +1169,5 @@ class DataEvolutionSplitRead(SplitRead):
         field_ids.append(SpecialFields.SEQUENCE_NUMBER.id)
         return field_ids
 
-    def _get_all_data_fields(self):
-        return SpecialFields.row_type_with_row_tracking(self.table.fields)
+    def _all_data_fields_from(self, fields):
+        return SpecialFields.row_type_with_row_tracking(fields)
diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py 
b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py
new file mode 100644
index 0000000000..c9147b823f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Schema-evolution read tests for nested (struct/array/map) types.
+
+Two layers are covered:
+
+* Whole-column evolution of a top-level struct/array/map column
+  (add / drop / rename / projection) -- aligned by the column's field id.
+* Sub-field evolution INSIDE a struct (add/rename/update-type/drop a nested
+  field via a dotted ``field_names`` path) -- this is NOT implemented:
+  ``schema_manager`` only operates on the top-level ``field_names[-1]``.
+  ``SchemaEvolutionNestedGapTest`` locks in the current behaviour with
+  explicit assertions so the gap is documented and any future fix is
+  noticed.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.data_types import AtomicType, PyarrowFieldParser
+from pypaimon.schema.schema_change import SchemaChange
+
+
+def _paimon_type(pa_type, nullable=True):
+    return PyarrowFieldParser.to_paimon_type(pa_type, nullable)
+
+
+_MV_PA = pa.struct([('latest_version', pa.int64()), ('latest_value', 
pa.string())])
+
+
+class _NestedBase(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create(self, name, pa_schema, primary_keys=None, bucket='-1'):
+        options = {'bucket': bucket}
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=list(primary_keys) if primary_keys else None,
+            options=options,
+        )
+        full = 'default.{}'.format(name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full)
+
+    def _write(self, table, pa_table):
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            w.write_arrow(pa_table)
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+
+    def _read_sorted(self, table, key='id', projection=None):
+        rb = table.new_read_builder()
+        if projection is not None:
+            rb = rb.with_projection(projection)
+        splits = rb.new_scan().plan().splits()
+        rows = rb.new_read().to_arrow(splits).to_pylist() if splits else []
+        return sorted(rows, key=lambda r: r[key])
+
+
+class SchemaEvolutionNestedReadTest(_NestedBase):
+    """Top-level struct/array/map column evolution (works)."""
+
+    # -- C1: add a new struct top-level column ---------------------------
+
+    def test_add_struct_column(self):
+        s0 = pa.schema([('id', pa.int64()), ('val', pa.string())])
+        table = self._create('nested_add_struct', s0)
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 2], 'val': ['x', 'y']}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.nested_add_struct',
+            [SchemaChange.add_column('mv', _paimon_type(_MV_PA))], False)
+        table = self.catalog.get_table('default.nested_add_struct')
+        s1 = pa.schema([('id', pa.int64()), ('val', pa.string()), ('mv', 
_MV_PA)])
+        self._write(table, pa.Table.from_pylist(
+            [{'id': 3, 'val': 'z',
+              'mv': {'latest_version': 300, 'latest_value': 'c'}}], schema=s1))
+
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'val': 'x', 'mv': None},
+            {'id': 2, 'val': 'y', 'mv': None},
+            {'id': 3, 'val': 'z',
+             'mv': {'latest_version': 300, 'latest_value': 'c'}}])
+
+    # -- C2: drop a struct top-level column ------------------------------
+
+    def test_drop_struct_column(self):
+        s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', 
pa.string())])
+        table = self._create('nested_drop_struct', s0)
+        self._write(table, pa.Table.from_pylist([
+            {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 
'val': 'x'},
+            {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'}, 
'val': 'y'},
+        ], schema=s0))
+
+        self.catalog.alter_table(
+            'default.nested_drop_struct', [SchemaChange.drop_column('mv')], 
False)
+        table = self.catalog.get_table('default.nested_drop_struct')
+        s1 = pa.schema([('id', pa.int64()), ('val', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [3], 'val': ['z']}, schema=s1))
+
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'val': 'x'}, {'id': 2, 'val': 'y'}, {'id': 3, 'val': 
'z'}])
+
+    # -- C4: nested leaf projection under evolution ----------------------
+
+    def test_nested_projection_after_add_column(self):
+        s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', 
pa.string())])
+        table = self._create('nested_proj_evo', s0)
+        self._write(table, pa.Table.from_pylist([
+            {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 
'val': 'x'},
+        ], schema=s0))
+
+        self.catalog.alter_table(
+            'default.nested_proj_evo',
+            [SchemaChange.add_column('extra', AtomicType('STRING'))], False)
+        table = self.catalog.get_table('default.nested_proj_evo')
+        s1 = pa.schema([('id', pa.int64()), ('mv', _MV_PA),
+                        ('val', pa.string()), ('extra', pa.string())])
+        self._write(table, pa.Table.from_pylist([
+            {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'},
+             'val': 'y', 'extra': 'E'},
+        ], schema=s1))
+
+        rows = self._read_sorted(
+            table, projection=['id', 'mv.latest_version', 'extra'])
+        self.assertEqual(rows, [
+            {'id': 1, 'mv_latest_version': 100, 'extra': None},
+            {'id': 2, 'mv_latest_version': 200, 'extra': 'E'}])
+
+    # -- C5: PK table struct column + add column merge read --------------
+
+    def test_pk_struct_add_column_merge(self):
+        s0 = pa.schema([pa.field('id', pa.int64(), nullable=False),
+                        ('mv', _MV_PA)])
+        table = self._create('nested_pk_struct', s0,
+                             primary_keys=['id'], bucket='1')
+        rows0 = [{'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}}]
+        self._write(table, pa.Table.from_pylist(rows0, schema=s0))
+
+        self.catalog.alter_table(
+            'default.nested_pk_struct',
+            [SchemaChange.add_column('w', AtomicType('STRING'))], False)
+        table = self.catalog.get_table('default.nested_pk_struct')
+        s1 = pa.schema([pa.field('id', pa.int64(), nullable=False),
+                        ('mv', _MV_PA), ('w', pa.string())])
+        self._write(table, pa.Table.from_pylist(
+            [{'id': 1, 'mv': {'latest_version': 101, 'latest_value': 'a2'},
+              'w': 'W'}], schema=s1))
+
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'mv': {'latest_version': 101, 'latest_value': 'a2'},
+             'w': 'W'}])
+
+    # -- C6: array / map top-level column add ----------------------------
+
+    def test_add_array_and_map_columns(self):
+        s0 = pa.schema([('id', pa.int64()), ('val', pa.string())])
+        table = self._create('nested_add_arr_map', s0)
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'val': ['x']}, schema=s0))
+
+        arr_pa = pa.list_(pa.int64())
+        map_pa = pa.map_(pa.string(), pa.int64())
+        self.catalog.alter_table(
+            'default.nested_add_arr_map', [
+                SchemaChange.add_column('arr', _paimon_type(arr_pa)),
+                SchemaChange.add_column('m', _paimon_type(map_pa)),
+            ], False)
+        table = self.catalog.get_table('default.nested_add_arr_map')
+        s1 = pa.schema([('id', pa.int64()), ('val', pa.string()),
+                        ('arr', arr_pa), ('m', map_pa)])
+        self._write(table, pa.Table.from_pylist(
+            [{'id': 2, 'val': 'y', 'arr': [1, 2, 3],
+              'm': [('k', 7)]}], schema=s1))
+
+        rows = self._read_sorted(table)
+        self.assertEqual(rows[0], {'id': 1, 'val': 'x', 'arr': None, 'm': 
None})
+        self.assertEqual(rows[1]['arr'], [1, 2, 3])
+        self.assertEqual(dict(rows[1]['m']), {'k': 7})
+
+    # -- C3: rename a struct top-level column ----------------------------
+
+    def test_rename_struct_column(self):
+        # Renaming the top-level struct column keeps its field id, so old rows
+        # read their original struct back under the new name.
+        s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', 
pa.string())])
+        table = self._create('nested_rename_struct', s0)
+        self._write(table, pa.Table.from_pylist([
+            {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 
'val': 'x'},
+        ], schema=s0))
+
+        self.catalog.alter_table(
+            'default.nested_rename_struct',
+            [SchemaChange.rename_column('mv', 'mv2')], False)
+        table = self.catalog.get_table('default.nested_rename_struct')
+        s1 = pa.schema([('id', pa.int64()), ('mv2', _MV_PA), ('val', 
pa.string())])
+        self._write(table, pa.Table.from_pylist([
+            {'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'}, 
'val': 'y'},
+        ], schema=s1))
+
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'mv2': {'latest_version': 100, 'latest_value': 'a'}, 
'val': 'x'},
+            {'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'}, 
'val': 'y'}])
+
+
+class SchemaEvolutionNestedGapTest(_NestedBase):
+    """Sub-field-level evolution inside a struct is NOT implemented.
+
+    schema_manager handles only the top-level ``field_names[-1]``; a dotted
+    path like ``['mv', 'latest_value']`` never recurses into the RowType.
+    These tests assert the current behaviour (silent top-level mutation, or
+    ColumnNotExistException) so the gap is documented.
+    """
+
+    def _create_struct_table(self, name):
+        s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', 
pa.string())])
+        table = self._create(name, s0)
+        self._write(table, pa.Table.from_pylist([
+            {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 
'val': 'x'},
+        ], schema=s0))
+        return table
+
+    def _mv_subfield_names(self, table_name):
+        schema = self.catalog.get_table(
+            'default.{}'.format(table_name)).table_schema
+        mv = next(f for f in schema.fields if f.name == 'mv')
+        return [sf.name for sf in mv.type.fields]
+
+    def _top_level_names(self, table_name):
+        schema = self.catalog.get_table(
+            'default.{}'.format(table_name)).table_schema
+        return [f.name for f in schema.fields]
+
+    # -- C7: add nested sub-field -> silently adds a TOP-LEVEL column ----
+
+    def test_nested_add_subfield_mutates_top_level(self):
+        # GAP: add_column(['mv','new_inner']) does NOT add new_inner inside
+        # mv; it silently appends a top-level column 'new_inner' instead.
+        self._create_struct_table('gap_add')
+        self.catalog.alter_table(
+            'default.gap_add',
+            [SchemaChange.add_column(['mv', 'new_inner'], AtomicType('INT'))],
+            False)
+        # mv's sub-fields are unchanged; a stray top-level column appeared.
+        self.assertEqual(self._mv_subfield_names('gap_add'),
+                         ['latest_version', 'latest_value'])
+        self.assertIn('new_inner', self._top_level_names('gap_add'))
+
+    # -- C8/C9/C10: rename / update-type / drop nested sub-field ---------
+
+    def test_nested_rename_subfield_raises(self):
+        # GAP: field_names[-1]='latest_value' is looked up at the TOP level,
+        # where it does not exist -> ColumnNotExistException (wrapped).
+        self._create_struct_table('gap_rename')
+        with self.assertRaises(RuntimeError) as cm:
+            self.catalog.alter_table(
+                'default.gap_rename',
+                [SchemaChange.rename_column(['mv', 'latest_value'], 'lv')], 
False)
+        self.assertIn('latest_value', str(cm.exception))
+
+    def test_nested_update_subfield_type_raises(self):
+        self._create_struct_table('gap_update')
+        with self.assertRaises(RuntimeError) as cm:
+            self.catalog.alter_table(
+                'default.gap_update',
+                [SchemaChange.update_column_type(
+                    ['mv', 'latest_version'], AtomicType('BIGINT'))], False)
+        self.assertIn('latest_version', str(cm.exception))
+
+    def test_nested_drop_subfield_raises(self):
+        self._create_struct_table('gap_drop')
+        with self.assertRaises(RuntimeError) as cm:
+            self.catalog.alter_table(
+                'default.gap_drop',
+                [SchemaChange.drop_column(['mv', 'latest_value'])], False)
+        self.assertIn('latest_value', str(cm.exception))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/schema_evolution_pk_read_test.py 
b/paimon-python/pypaimon/tests/schema_evolution_pk_read_test.py
new file mode 100644
index 0000000000..4df02bb33c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/schema_evolution_pk_read_test.py
@@ -0,0 +1,335 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Schema-evolution read tests for primary-key tables (merge engines).
+
+Each test creates a PK table (``bucket=1`` so same-PK rows share a bucket
+and the read goes through the merge reader), writes a batch under the
+original schema, evolves the schema via ``catalog.alter_table``, writes a
+second batch under the new schema, and reads back the merged result that
+spans both schema versions.
+
+Covers add / drop / type promotion / position as well as the field-id
+isolation cases (rename, drop-then-readd same name) that a name-based
+alignment would get wrong -- the merged result must follow field id.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.data_types import AtomicType
+from pypaimon.schema.schema_change import Move, SchemaChange
+
+
+class SchemaEvolutionPkReadTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create(self, name, pa_schema, merge_engine='deduplicate',
+                primary_keys=('id',), partition_keys=None, extra=None):
+        options = {'bucket': '1', 'merge-engine': merge_engine}
+        if extra:
+            options.update(extra)
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=list(primary_keys),
+            partition_keys=list(partition_keys) if partition_keys else None,
+            options=options,
+        )
+        full = 'default.{}'.format(name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full)
+
+    def _write(self, table, pa_table):
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            w.write_arrow(pa_table)
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+
+    def _read_sorted(self, table, key='id'):
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        rows = rb.new_read().to_arrow(splits).to_pylist() if splits else []
+        return sorted(rows, key=lambda r: r[key])
+
+    @staticmethod
+    def _pk(name, arrow_type, nullable=True):
+        return pa.field(name, arrow_type, nullable=nullable)
+
+    # -- B1: add column + deduplicate ------------------------------------
+
+    def test_pk_add_column_deduplicate(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v', pa.string())])
+        table = self._create('pk_add_dedup', s0, 'deduplicate')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 2], 'v': ['a', 'b']}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_add_dedup',
+            [SchemaChange.add_column('w', AtomicType('STRING'))], False)
+        table = self.catalog.get_table('default.pk_add_dedup')
+        s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v', pa.string()), ('w', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 3], 'v': ['a2', 'c'], 'w': ['w1', 'w3']}, schema=s1))
+
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'v': 'a2', 'w': 'w1'},   # latest wins
+            {'id': 2, 'v': 'b', 'w': None},    # old file, w padded NULL
+            {'id': 3, 'v': 'c', 'w': 'w3'}])
+
+    # -- B2: add column + partial-update ---------------------------------
+
+    def test_pk_add_column_partial_update(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string())])
+        table = self._create('pk_add_partial', s0, 'partial-update')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'a': ['A']}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_add_partial',
+            [SchemaChange.add_column('b', AtomicType('STRING'))], False)
+        table = self.catalog.get_table('default.pk_add_partial')
+        s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string()), ('b', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'a': [None], 'b': ['B']}, schema=s1))
+
+        # partial-update merges non-null per field across the two versions.
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'a': 'A', 'b': 'B'}])
+
+    # -- B3: type promotion on a value column + deduplicate --------------
+
+    def test_pk_type_promotion_deduplicate(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('n', pa.int32())])
+        table = self._create('pk_promote', s0, 'deduplicate')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 2], 'n': [10, 20]}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_promote',
+            [SchemaChange.update_column_type('n', AtomicType('BIGINT'))], 
False)
+        table = self.catalog.get_table('default.pk_promote')
+        s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('n', pa.int64())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 3], 'n': [100, 30]}, schema=s1))
+
+        rows = self._read_sorted(table)
+        self.assertEqual(rows, [
+            {'id': 1, 'n': 100}, {'id': 2, 'n': 20}, {'id': 3, 'n': 30}])
+
+    # -- B5: drop a value column + partial-update ------------------------
+
+    def test_pk_drop_column_partial_update(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string()), ('b', pa.string())])
+        table = self._create('pk_drop_partial', s0, 'partial-update')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'a': ['A'], 'b': [None]}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_drop_partial', [SchemaChange.drop_column('b')], False)
+        table = self.catalog.get_table('default.pk_drop_partial')
+        s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'a': ['A2']}, schema=s1))
+
+        self.assertEqual(self._read_sorted(table), [{'id': 1, 'a': 'A2'}])
+
+    # -- B6: reorder a value column (position) + deduplicate -------------
+
+    def test_pk_column_position_deduplicate(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string()), ('b', pa.string())])
+        table = self._create('pk_position', s0, 'deduplicate')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'a': ['a1'], 'b': ['b1']}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_position',
+            [SchemaChange.update_column_position(Move.first('b'))], False)
+        table = self.catalog.get_table('default.pk_position')
+        s1 = pa.schema([('b', pa.string()),
+                        self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'b': ['b2'], 'id': [1], 'a': ['a2']}, schema=s1))
+
+        self.assertEqual(self._read_sorted(table), [
+            {'b': 'b2', 'id': 1, 'a': 'a2'}])
+
+    # -- B7: first-row engine + add column -------------------------------
+
+    def test_pk_first_row_add_column(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v', pa.string())])
+        table = self._create('pk_first_row', s0, 'first-row')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'v': ['first']}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_first_row',
+            [SchemaChange.add_column('w', AtomicType('STRING'))], False)
+        table = self.catalog.get_table('default.pk_first_row')
+        s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v', pa.string()), ('w', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'v': ['second'], 'w': ['W']}, schema=s1))
+
+        # first-row keeps the earliest row; it predates column w, so w is NULL.
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'v': 'first', 'w': None}])
+
+    # -- B8: multi-version chain (add + promotion) + partial-update ------
+
+    def test_pk_multi_version_chain_partial_update(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string()), ('n', pa.int32())])
+        table = self._create('pk_chain', s0, 'partial-update')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'a': ['A'], 'n': [10]}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_chain',
+            [SchemaChange.add_column('b', AtomicType('STRING'))], False)
+        table = self.catalog.get_table('default.pk_chain')
+        s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string()), ('n', pa.int32()), ('b', 
pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'a': [None], 'n': [None], 'b': ['B']}, schema=s1))
+
+        self.catalog.alter_table(
+            'default.pk_chain',
+            [SchemaChange.update_column_type('n', AtomicType('BIGINT'))], 
False)
+        table = self.catalog.get_table('default.pk_chain')
+        s2 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('a', pa.string()), ('n', pa.int64()), ('b', 
pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1], 'a': [None], 'n': [99], 'b': [None]}, schema=s2))
+
+        # partial-update across three schema versions: a from v0, b from v1,
+        # n (promoted) latest non-null from v2.
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'a': 'A', 'n': 99, 'b': 'B'}])
+
+    # -- B9 / B10: negative constraints ----------------------------------
+
+    def test_pk_cannot_update_primary_key_type(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v', pa.string())])
+        self._create('pk_no_pk_type', s0, 'deduplicate')
+        with self.assertRaises(RuntimeError) as cm:
+            self.catalog.alter_table(
+                'default.pk_no_pk_type',
+                [SchemaChange.update_column_type('id', AtomicType('BIGINT'))],
+                False)
+        self.assertIn('primary key', str(cm.exception))
+
+    def test_pk_cannot_drop_or_rename_partition_key(self):
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        self._pk('dt', pa.string(), nullable=False),
+                        ('v', pa.string())])
+        # PK must contain the partition key.
+        self._create('pk_part_guard', s0, 'deduplicate',
+                     primary_keys=('id', 'dt'), partition_keys=('dt',))
+        with self.assertRaises(RuntimeError) as cm:
+            self.catalog.alter_table(
+                'default.pk_part_guard', [SchemaChange.drop_column('dt')], 
False)
+        self.assertIn('partition', str(cm.exception).lower())
+        with self.assertRaises(RuntimeError):
+            self.catalog.alter_table(
+                'default.pk_part_guard',
+                [SchemaChange.rename_column('dt', 'dt2')], False)
+
+    # -- B4: rename a value column ---------------------------------------
+
+    def test_pk_rename_value_column(self):
+        # After rename, the old file's value (written under the old name) must
+        # still merge in under the new name -- alignment follows field id, so
+        # id=2 keeps its old 'b'.
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v', pa.string())])
+        table = self._create('pk_rename', s0, 'deduplicate')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 2], 'v': ['a', 'b']}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_rename',
+            [SchemaChange.rename_column('v', 'v2')], False)
+        table = self.catalog.get_table('default.pk_rename')
+        s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v2', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 3], 'v2': ['a2', 'c']}, schema=s1))
+
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'v2': 'a2'}, {'id': 2, 'v2': 'b'}, {'id': 3, 'v2': 'c'}])
+
+    # -- B11: drop a value column then re-add the same name --------------
+
+    def test_pk_drop_then_readd_same_name(self):
+        # The re-added v (new field id) must not revive the dropped column's
+        # old data: aligned by field id, id=2 (only in the old file) reads
+        # NULL for the new v.
+        s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v', pa.string())])
+        table = self._create('pk_drop_readd', s0, 'deduplicate')
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 2], 'v': ['old1', 'old2']}, schema=s0))
+
+        self.catalog.alter_table(
+            'default.pk_drop_readd', [SchemaChange.drop_column('v')], False)
+        self.catalog.alter_table(
+            'default.pk_drop_readd',
+            [SchemaChange.add_column('v', AtomicType('STRING'))], False)
+        table = self.catalog.get_table('default.pk_drop_readd')
+        s2 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+                        ('v', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'id': [1, 3], 'v': ['new1', 'new3']}, schema=s2))
+
+        self.assertEqual(self._read_sorted(table), [
+            {'id': 1, 'v': 'new1'}, {'id': 2, 'v': None}, {'id': 3, 'v': 
'new3'}])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py 
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index 0bc5c9f142..febbf8c4d8 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -26,7 +26,7 @@ import pyarrow as pa
 from pypaimon import CatalogFactory, Schema
 
 from pypaimon.schema.data_types import AtomicType
-from pypaimon.schema.schema_change import SchemaChange
+from pypaimon.schema.schema_change import Move, SchemaChange
 from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.schema.table_schema import TableSchema
 
@@ -546,6 +546,293 @@ class SchemaEvolutionReadTest(unittest.TestCase):
         }, schema=pa_schema)
         self.assertEqual(expected, actual)
 
+    # ------------------------------------------------------------------
+    # Public-API end-to-end evolution cases (create -> write -> alter_table
+    # -> write -> read), the范式 promoted by
+    # test_schema_evolution_type_promotion_unpartitioned. Unlike the older
+    # white-box cases above (manual schema-N files + file.schema_id), these
+    # drive evolution purely through catalog.alter_table.
+    # ------------------------------------------------------------------
+
+    def _write(self, table, pa_table):
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    def _read_rows(self, table, sort_key='k'):
+        read_builder = table.new_read_builder()
+        rows = read_builder.new_read().to_arrow(
+            self._scan_table(read_builder)).to_pylist()
+        return sorted(rows, key=lambda r: (r[sort_key] is None, r[sort_key]))
+
+    def _read_arrow(self, table, projection=None):
+        read_builder = table.new_read_builder()
+        if projection is not None:
+            read_builder = read_builder.with_projection(projection)
+        return read_builder.new_read().to_arrow(self._scan_table(read_builder))
+
+    def test_evolution_drop_column_then_read(self):
+        # A5: a column dropped after old data was written must disappear from
+        # the read; the old file's value for it is discarded, not surfaced.
+        name = 'default.evo_drop'
+        s0 = pa.schema([('k', pa.int64()), ('v', pa.string()), ('w', 
pa.string())])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        table = self.catalog.get_table(name)
+        self._write(table, pa.Table.from_pydict(
+            {'k': [1, 2], 'v': ['a', 'b'], 'w': ['p', 'q']}, schema=s0))
+
+        self.catalog.alter_table(name, [SchemaChange.drop_column('v')], False)
+        table = self.catalog.get_table(name)
+        s1 = pa.schema([('k', pa.int64()), ('w', pa.string())])
+        self._write(table, pa.Table.from_pydict({'k': [3], 'w': ['r']}, 
schema=s1))
+
+        self.assertEqual(self._read_rows(table), [
+            {'k': 1, 'w': 'p'}, {'k': 2, 'w': 'q'}, {'k': 3, 'w': 'r'}])
+
+    def test_evolution_rename_column_then_read(self):
+        # A6: after rename the old file (written under the old name) and the
+        # new file (new name) must read back as the SAME logical column,
+        # matched by field id, not by name.
+        name = 'default.evo_rename'
+        s0 = pa.schema([('k', pa.int64()), ('v', pa.string())])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        table = self.catalog.get_table(name)
+        self._write(table, pa.Table.from_pydict(
+            {'k': [1, 2], 'v': ['a', 'b']}, schema=s0))
+
+        self.catalog.alter_table(
+            name, [SchemaChange.rename_column('v', 'renamed')], False)
+        table = self.catalog.get_table(name)
+        s1 = pa.schema([('k', pa.int64()), ('renamed', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'k': [3], 'renamed': ['c']}, schema=s1))
+
+        self.assertEqual(self._read_rows(table), [
+            {'k': 1, 'renamed': 'a'},
+            {'k': 2, 'renamed': 'b'},
+            {'k': 3, 'renamed': 'c'}])
+
+    def test_evolution_column_position_then_read(self):
+        # A7: moving a column to FIRST must reorder the read schema; the old
+        # file (original order) must be remapped so values stay with their
+        # column.
+        name = 'default.evo_position'
+        s0 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('b', 
pa.string())])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        table = self.catalog.get_table(name)
+        self._write(table, pa.Table.from_pydict(
+            {'k': [1], 'a': ['a1'], 'b': ['b1']}, schema=s0))
+
+        self.catalog.alter_table(
+            name, [SchemaChange.update_column_position(Move.first('b'))], 
False)
+        table = self.catalog.get_table(name)
+        s1 = pa.schema([('b', pa.string()), ('k', pa.int64()), ('a', 
pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'b': ['b2'], 'k': [2], 'a': ['a2']}, schema=s1))
+
+        arrow = self._read_arrow(table)
+        self.assertEqual(arrow.column_names, ['b', 'k', 'a'])
+        self.assertEqual(self._read_rows(table), [
+            {'b': 'b1', 'k': 1, 'a': 'a1'},
+            {'b': 'b2', 'k': 2, 'a': 'a2'}])
+
+    def test_evolution_multi_version_chain(self):
+        # A8: chain schema versions (add + type promotion), writing one batch
+        # per version then reading across all of them. Rename across versions
+        # is covered on its own by A6; this case isolates the add+promotion
+        # chain.
+        name = 'default.evo_chain'
+        s0 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('n', 
pa.int32())])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        table = self.catalog.get_table(name)
+        self._write(table, pa.Table.from_pydict(
+            {'k': [1], 'a': ['a1'], 'n': [10]}, schema=s0))
+
+        # v1: add column b
+        self.catalog.alter_table(
+            name, [SchemaChange.add_column('b', AtomicType('STRING'))], False)
+        table = self.catalog.get_table(name)
+        s1 = pa.schema([('k', pa.int64()), ('a', pa.string()),
+                        ('n', pa.int32()), ('b', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'k': [2], 'a': ['a2'], 'n': [20], 'b': ['b2']}, schema=s1))
+
+        # v2: promote n int32 -> int64
+        self.catalog.alter_table(
+            name, [SchemaChange.update_column_type('n', AtomicType('BIGINT'))],
+            False)
+        table = self.catalog.get_table(name)
+        s2 = pa.schema([('k', pa.int64()), ('a', pa.string()),
+                        ('n', pa.int64()), ('b', pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'k': [3], 'a': ['a3'], 'n': [30], 'b': ['b3']}, schema=s2))
+
+        # v3: add column d
+        self.catalog.alter_table(
+            name, [SchemaChange.add_column('d', AtomicType('STRING'))], False)
+        table = self.catalog.get_table(name)
+        s3 = pa.schema([('k', pa.int64()), ('a', pa.string()),
+                        ('n', pa.int64()), ('b', pa.string()), ('d', 
pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'k': [4], 'a': ['a4'], 'n': [40], 'b': ['b4'], 'd': ['d4']},
+            schema=s3))
+
+        self.assertEqual(self._read_rows(table), [
+            {'k': 1, 'a': 'a1', 'n': 10, 'b': None, 'd': None},
+            {'k': 2, 'a': 'a2', 'n': 20, 'b': 'b2', 'd': None},
+            {'k': 3, 'a': 'a3', 'n': 30, 'b': 'b3', 'd': None},
+            {'k': 4, 'a': 'a4', 'n': 40, 'b': 'b4', 'd': 'd4'}])
+
+    def test_evolution_projection_after_add_column(self):
+        # A10: projecting a subset that includes the newly added column must
+        # return NULL for that column on rows from old files.
+        name = 'default.evo_projection'
+        s0 = pa.schema([('k', pa.int64()), ('a', pa.string())])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        table = self.catalog.get_table(name)
+        self._write(table, pa.Table.from_pydict(
+            {'k': [1, 2], 'a': ['a1', 'a2']}, schema=s0))
+
+        self.catalog.alter_table(
+            name, [SchemaChange.add_column('b', AtomicType('STRING'))], False)
+        table = self.catalog.get_table(name)
+        s1 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('b', 
pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'k': [3], 'a': ['a3'], 'b': ['b3']}, schema=s1))
+
+        rows = sorted(
+            self._read_arrow(table, projection=['k', 'b']).to_pylist(),
+            key=lambda r: r['k'])
+        self.assertEqual(rows, [
+            {'k': 1, 'b': None}, {'k': 2, 'b': None}, {'k': 3, 'b': 'b3'}])
+
+    def test_evolution_nullability_then_read(self):
+        # A11: relaxing NOT NULL -> nullable must let later NULLs read back
+        # while the old (non-null) file still reads correctly.
+        name = 'default.evo_nullability'
+        s0 = pa.schema([('k', pa.int64()),
+                        pa.field('v', pa.string(), nullable=False)])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        table = self.catalog.get_table(name)
+        self._write(table, pa.Table.from_pydict(
+            {'k': [1, 2], 'v': ['a', 'b']}, schema=s0))
+
+        self.catalog.alter_table(
+            name, [SchemaChange.update_column_nullability('v', True)], False)
+        table = self.catalog.get_table(name)
+        s1 = pa.schema([('k', pa.int64()), ('v', pa.string())])
+        self._write(table, pa.Table.from_pydict({'k': [3], 'v': [None]}, 
schema=s1))
+
+        self.assertEqual(self._read_rows(table), [
+            {'k': 1, 'v': 'a'}, {'k': 2, 'v': 'b'}, {'k': 3, 'v': None}])
+
+    def test_evolution_drop_then_readd_same_name(self):
+        # A13 (name-vs-field-id isolation): dropping v then adding a new v
+        # (fresh field id) must NOT revive the old file's v. Because columns
+        # are aligned by field id, the dropped v's id is gone and the re-added
+        # v's new id is absent from old files, so old rows read NULL. Covers
+        # same-type and changed-type re-add.
+        for label, readd_paimon, readd_arrow, new_val in [
+            ('same_type', 'STRING', pa.string(), ['new3']),
+            ('changed_type', 'INT', pa.int32(), [99]),
+        ]:
+            with self.subTest(case=label):
+                name = 'default.evo_drop_readd_{}'.format(label)
+                s0 = pa.schema([('k', pa.int64()), ('v', pa.string())])
+                self.catalog.create_table(
+                    name, Schema.from_pyarrow_schema(s0), False)
+                table = self.catalog.get_table(name)
+                self._write(table, pa.Table.from_pydict(
+                    {'k': [1, 2], 'v': ['old1', 'old2']}, schema=s0))
+
+                self.catalog.alter_table(
+                    name, [SchemaChange.drop_column('v')], False)
+                self.catalog.alter_table(
+                    name, [SchemaChange.add_column('v', 
AtomicType(readd_paimon))],
+                    False)
+                table = self.catalog.get_table(name)
+                s2 = pa.schema([('k', pa.int64()), ('v', readd_arrow)])
+                self._write(table, pa.Table.from_pydict(
+                    {'k': [3], 'v': new_val}, schema=s2))
+
+                self.assertEqual(self._read_rows(table), [
+                    {'k': 1, 'v': None},
+                    {'k': 2, 'v': None},
+                    {'k': 3, 'v': new_val[0]}])
+
+    def test_evolution_rename_then_add_same_name(self):
+        # A14 (name-vs-field-id isolation): rename a->b, then add a new a
+        # (fresh field id). Aligned by field id, reading old files b (old id)
+        # carries the old a-data and the new a (new id) reads NULL -- a
+        # name-based alignment would instead feed the old physical column 'a'
+        # to the new a and leave b empty.
+        name = 'default.evo_rename_readd'
+        s0 = pa.schema([('k', pa.int64()), ('a', pa.string())])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        table = self.catalog.get_table(name)
+        self._write(table, pa.Table.from_pydict(
+            {'k': [1, 2], 'a': ['old1', 'old2']}, schema=s0))
+
+        self.catalog.alter_table(
+            name, [SchemaChange.rename_column('a', 'b')], False)
+        self.catalog.alter_table(
+            name, [SchemaChange.add_column('a', AtomicType('STRING'))], False)
+        table = self.catalog.get_table(name)
+        s2 = pa.schema([('k', pa.int64()), ('b', pa.string()), ('a', 
pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'k': [3], 'b': ['B3'], 'a': ['A3']}, schema=s2))
+
+        self.assertEqual(self._read_rows(table), [
+            {'k': 1, 'b': 'old1', 'a': None},
+            {'k': 2, 'b': 'old2', 'a': None},
+            {'k': 3, 'b': 'B3', 'a': 'A3'}])
+
+    def test_evolution_column_swap_direct_rejected(self):
+        # A15a: a one-shot name swap is rejected -- renaming a->b while b
+        # still exists collides on the existing name. (Constraint check, this
+        # behaviour is correct.)
+        name = 'default.evo_swap_direct'
+        s0 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('b', 
pa.string())])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        # alter_table wraps the underlying ColumnAlreadyExistException in a
+        # RuntimeError (filesystem_catalog catch-all), so match on that.
+        with self.assertRaises(RuntimeError) as cm:
+            self.catalog.alter_table(name, [
+                SchemaChange.rename_column('a', 'b'),
+                SchemaChange.rename_column('b', 'a'),
+            ], False)
+        self.assertIn('already exists', str(cm.exception))
+
+    def test_evolution_column_swap_via_temp_name(self):
+        # A15b (name-vs-field-id isolation): a 3-step swap via a temp name
+        # keeps field ids stable, so on read the old data follows the id, not
+        # the name -- old 'a1' (id of a) ends up under column b, old 'b1'
+        # under column a. A name-based alignment would drop/misalign both.
+        name = 'default.evo_swap_temp'
+        s0 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('b', 
pa.string())])
+        self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+        table = self.catalog.get_table(name)
+        self._write(table, pa.Table.from_pydict(
+            {'k': [1], 'a': ['a1'], 'b': ['b1']}, schema=s0))
+
+        self.catalog.alter_table(name, [
+            SchemaChange.rename_column('a', '__tmp'),
+            SchemaChange.rename_column('b', 'a'),
+            SchemaChange.rename_column('__tmp', 'b'),
+        ], False)
+        table = self.catalog.get_table(name)
+        s1 = pa.schema([('k', pa.int64()), ('b', pa.string()), ('a', 
pa.string())])
+        self._write(table, pa.Table.from_pydict(
+            {'k': [2], 'b': ['B2'], 'a': ['A2']}, schema=s1))
+
+        self.assertEqual(self._read_rows(table), [
+            {'k': 1, 'b': 'a1', 'a': 'b1'},
+            {'k': 2, 'b': 'B2', 'a': 'A2'}])
+
     def _write_test_table(self, table):
         write_builder = table.new_batch_write_builder()
 

Reply via email to