This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cf5db9970a [python] Align read-time schema evolution by field id
(#8126)
cf5db9970a is described below
commit cf5db9970aa084302be8be4155be694f54979bc0
Author: chaoyang <[email protected]>
AuthorDate: Sat Jun 6 16:57:03 2026 +0800
[python] Align read-time schema evolution by field id (#8126)
Read-time schema evolution matched a data file's columns to the read
schema by **name** rather than by **field id**. So after a column rename
the old file's column became unreachable and its rows read `NULL`, and
dropping a column then re-adding the same name revived the dropped
column's stale data.
Align per file by field id instead: read each file under its own field
names/types and normalize to the latest read schema by id — a rename
follows the id, and a re-added name gets a fresh id and pads `NULL`.
Non-evolving reads stay zero-copy.
Adds regression tests for rename / drop-then-readd / swap across
append-only, primary-key (merge engines) and nested top-level columns.
---
.../pypaimon/read/reader/data_file_batch_reader.py | 56 +++-
paimon-python/pypaimon/read/split_read.py | 101 +++++--
.../tests/schema_evolution_nested_read_test.py | 317 +++++++++++++++++++
.../tests/schema_evolution_pk_read_test.py | 335 +++++++++++++++++++++
.../pypaimon/tests/schema_evolution_read_test.py | 289 +++++++++++++++++-
5 files changed, 1073 insertions(+), 25 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 21d1b2a911..e3bd511439 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -40,7 +40,9 @@ class DataFileBatchReader(RecordBatchReader):
row_tracking_enabled: bool,
system_fields: dict,
file_io: Optional[FileIO] = None,
- row_id_offsets: Optional[List[int]] = None):
+ row_id_offsets: Optional[List[int]] = None,
+ file_data_fields: Optional[List[DataField]] = None,
+ target_data_fields: Optional[List[DataField]] = None):
self.format_reader = format_reader
self.index_mapping = index_mapping
self.partition_info = partition_info
@@ -53,6 +55,57 @@ class DataFileBatchReader(RecordBatchReader):
self.max_sequence_number = max_sequence_number
self.system_fields = system_fields
self.file_io = file_io
+ # Per-file field-id normalization: map the physically-read columns
+ # (the file's own field order/names) onto the latest read target by
+ # field id, padding missing ids with NULL. ``None`` when there is no
+ # evolution to reconcile (identity) -- the common path stays zero-copy.
+ self._normalize_positions, self._normalize_names = \
+ self._build_normalize_plan(file_data_fields, target_data_fields)
+
+ @staticmethod
+ def _build_normalize_plan(file_data_fields, target_data_fields):
+ """Build a per-file field-id alignment plan.
+
+ Returns ``(positions, names)`` where ``positions[i]`` is the column
+ index in the physically-read batch carrying ``target_data_fields[i]``
+ (matched by field id), or -1 if the file does not contain that id (pad
+ NULL). ``names[i]`` is the latest target name. Returns ``(None, None)``
+ when the plan is the identity (no evolution), so the caller skips
+ normalization and stays zero-copy.
+ """
+ if file_data_fields is None or target_data_fields is None:
+ return None, None
+ file_id_to_pos = {f.id: i for i, f in enumerate(file_data_fields)}
+ positions = []
+ names = []
+ # Identity only when every target maps to the same physical position
+ # AND already carries the same name -- a rename keeps the position but
+ # changes the name, which still requires a relabel pass.
+ identity = len(file_data_fields) == len(target_data_fields)
+ for i, target in enumerate(target_data_fields):
+ pos = file_id_to_pos.get(target.id, -1)
+ positions.append(pos)
+ names.append(target.name)
+ if pos != i or (pos >= 0 and file_data_fields[pos].name !=
target.name):
+ identity = False
+ if identity:
+ return None, None
+ return positions, names
+
+ def _normalize_batch(self, record_batch: RecordBatch) -> RecordBatch:
+ """Reorder/pad the physically-read batch onto the latest read target by
+ field id, and relabel columns to the latest names. Missing ids become
+ all-NULL columns; types are reconciled later by
_align_batch_to_read_schema."""
+ if self._normalize_positions is None:
+ return record_batch
+ num_rows = record_batch.num_rows
+ arrays = []
+ for pos in self._normalize_positions:
+ if pos < 0:
+ arrays.append(pa.nulls(num_rows))
+ else:
+ arrays.append(record_batch.column(pos))
+ return pa.RecordBatch.from_arrays(arrays, names=self._normalize_names)
def read_arrow_batch(self, start_idx=None, end_idx=None) ->
Optional[RecordBatch]:
if isinstance(self.format_reader, FormatBlobReader):
@@ -61,6 +114,7 @@ class DataFileBatchReader(RecordBatchReader):
record_batch = self.format_reader.read_arrow_batch()
if record_batch is None:
return None
+ record_batch = self._normalize_batch(record_batch)
if self.partition_info is None and self.index_mapping is None:
# A file written under an older schema (e.g. before an INT ->
BIGINT
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index fe76527431..543c858935 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -217,14 +217,27 @@ class SplitRead(ABC):
nested_path_by_name = self._nested_path_by_name()
has_nested = nested_path_by_name is not None
- # Cover both the merge-internal aliases (``_KEY_id``) and the
- # bare user-facing PK name (``id``) the file actually stores.
- name_to_field: Dict[str, DataField] = {f.name: f for f in
self.read_fields}
- _, _trimmed_lookup_fields = self._get_trimmed_fields(
- self._get_read_data_fields(), self._get_all_data_fields()
- )
- for f in _trimmed_lookup_fields:
- name_to_field.setdefault(f.name, f)
+ # Field-id based per-file read (non-nested): select the file's OWN
+ # physical fields by field id and read them under the file's original
+ # names/types. A normalize step in DataFileBatchReader then aligns the
+ # batch to the latest read schema by field id (not by name), so a
+ # rename follows the id and a dropped-then-readded name cannot revive
+ # stale data. Nested-projection reads stay on the legacy name path.
+ file_read_fields = None if has_nested else self._file_read_fields(file)
+ target_fields = None if has_nested else self._target_read_fields()
+ if file_read_fields is not None:
+ read_file_fields = [f.name for f in file_read_fields]
+ name_to_field: Dict[str, DataField] = {
+ f.name: f for f in file_read_fields}
+ else:
+ # Cover both the merge-internal aliases (``_KEY_id``) and the
+ # bare user-facing PK name (``id``) the file actually stores.
+ name_to_field = {f.name: f for f in self.read_fields}
+ _, _trimmed_lookup_fields = self._get_trimmed_fields(
+ self._get_read_data_fields(), self._get_all_data_fields()
+ )
+ for f in _trimmed_lookup_fields:
+ name_to_field.setdefault(f.name, f)
format_reader: RecordBatchReader
if file_format == CoreOptions.FILE_FORMAT_AVRO:
@@ -342,7 +355,9 @@ class SplitRead(ABC):
row_tracking_enabled,
system_fields,
file_io=self.table.file_io,
- row_id_offsets=row_indices)
+ row_id_offsets=row_indices,
+ file_data_fields=file_read_fields,
+ target_data_fields=target_fields)
else:
reader = DataFileBatchReader(
format_reader,
@@ -355,7 +370,9 @@ class SplitRead(ABC):
row_tracking_enabled,
system_fields,
file_io=self.table.file_io,
- row_id_offsets=row_indices)
+ row_id_offsets=row_indices,
+ file_data_fields=file_read_fields,
+ target_data_fields=target_fields)
# For non-Vortex formats, wrap with RowIdFilterRecordBatchReader
if row_ranges is not None and row_indices is None:
@@ -401,16 +418,54 @@ class SplitRead(ABC):
return self.schema_id_2_fields[key]
@abstractmethod
+ def _all_data_fields_from(self, fields: List[DataField]) ->
List[DataField]:
+ """Apply this split-read's data-field shaping (row-tracking / kv
+ wrapping) to the given base ``fields``. Called both for the latest
+ table schema and for an older file schema."""
+
def _get_all_data_fields(self):
- """Get all data fields"""
+ return self._all_data_fields_from(self.table.fields)
def _get_read_data_fields(self):
- read_data_fields = []
+ return self._read_data_fields_from(self._get_all_data_fields())
+
+ def _read_data_fields_from(self, all_data_fields):
read_field_ids = {field.id for field in self.read_fields}
- for data_field in self._get_all_data_fields():
- if data_field.id in read_field_ids:
- read_data_fields.append(data_field)
- return read_data_fields
+ return [f for f in all_data_fields if f.id in read_field_ids]
+
+ def _final_data_fields_from(self, all_data_fields: List[DataField]) ->
List[DataField]:
+ """The per-position target fields a batch must end up as: trimmed for
+ kv (``_KEY_*``) duplicates and stripped of partition columns. The
+ DataField analogue of ``_get_final_read_data_fields()``. Called with
+ the latest-schema fields it yields the read target (latest names +
+ types); called with a file's fields it yields what to physically read
+ from that file (the file's own names + types)."""
+ _, trimmed = self._get_trimmed_fields(
+ self._read_data_fields_from(all_data_fields), all_data_fields)
+ partition_keys = self.table.partition_keys
+ if not partition_keys:
+ return list(trimmed)
+ return [f for f in trimmed if f.name not in partition_keys]
+
+ def _target_read_fields(self) -> Optional[List[DataField]]:
+ """Latest-schema target fields (names + types) that a normalized batch
+ must align to, in order. None for nested-projection reads (kept on the
+ legacy name-based path)."""
+ if self._nested_path_by_name() is not None:
+ return None
+ return self._final_data_fields_from(self._get_all_data_fields())
+
+ def _file_read_fields(self, file: DataFileMeta) ->
Optional[List[DataField]]:
+ """The fields to physically read from ``file``, in the file's own
+ names/types, selected by field id against the read set. None for
+ nested-projection reads."""
+ if self._nested_path_by_name() is not None:
+ return None
+ file_schema = self.table.schema_manager.get_schema(file.schema_id)
+ if file_schema is None:
+ return None
+ return self._final_data_fields_from(
+ self._all_data_fields_from(file_schema.fields))
def _create_key_value_fields(self, value_field: List[DataField]):
all_fields: List[DataField] = self.table.fields
@@ -629,10 +684,10 @@ class RawFileSplitRead(SplitRead):
reader = LimitedRecordBatchReader(reader, self.limit)
return reader
- def _get_all_data_fields(self):
+ def _all_data_fields_from(self, fields):
if self.row_tracking_enabled:
- return SpecialFields.row_type_with_row_tracking(self.table.fields)
- return self.table.fields
+ return SpecialFields.row_type_with_row_tracking(fields)
+ return fields
class MergeFileSplitRead(SplitRead):
@@ -760,8 +815,8 @@ class MergeFileSplitRead(SplitRead):
reader = LimitedRecordReader(reader, self.limit)
return reader
- def _get_all_data_fields(self):
- return self._create_key_value_fields(self.table.fields)
+ def _all_data_fields_from(self, fields):
+ return self._create_key_value_fields(fields)
class DataEvolutionSplitRead(SplitRead):
@@ -1114,5 +1169,5 @@ class DataEvolutionSplitRead(SplitRead):
field_ids.append(SpecialFields.SEQUENCE_NUMBER.id)
return field_ids
- def _get_all_data_fields(self):
- return SpecialFields.row_type_with_row_tracking(self.table.fields)
+ def _all_data_fields_from(self, fields):
+ return SpecialFields.row_type_with_row_tracking(fields)
diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py
b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py
new file mode 100644
index 0000000000..c9147b823f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py
@@ -0,0 +1,317 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Schema-evolution read tests for nested (struct/array/map) types.
+
+Two layers are covered:
+
+* Whole-column evolution of a top-level struct/array/map column
+ (add / drop / rename / projection) -- aligned by the column's field id.
+* Sub-field evolution INSIDE a struct (add/rename/update-type/drop a nested
+ field via a dotted ``field_names`` path) -- this is NOT implemented:
+ ``schema_manager`` only operates on the top-level ``field_names[-1]``.
+ ``SchemaEvolutionNestedGapTest`` locks in the current behaviour with
+ explicit assertions so the gap is documented and any future fix is
+ noticed.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.data_types import AtomicType, PyarrowFieldParser
+from pypaimon.schema.schema_change import SchemaChange
+
+
+def _paimon_type(pa_type, nullable=True):
+ return PyarrowFieldParser.to_paimon_type(pa_type, nullable)
+
+
+_MV_PA = pa.struct([('latest_version', pa.int64()), ('latest_value',
pa.string())])
+
+
+class _NestedBase(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create(self, name, pa_schema, primary_keys=None, bucket='-1'):
+ options = {'bucket': bucket}
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=list(primary_keys) if primary_keys else None,
+ options=options,
+ )
+ full = 'default.{}'.format(name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full)
+
+ def _write(self, table, pa_table):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa_table)
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ def _read_sorted(self, table, key='id', projection=None):
+ rb = table.new_read_builder()
+ if projection is not None:
+ rb = rb.with_projection(projection)
+ splits = rb.new_scan().plan().splits()
+ rows = rb.new_read().to_arrow(splits).to_pylist() if splits else []
+ return sorted(rows, key=lambda r: r[key])
+
+
+class SchemaEvolutionNestedReadTest(_NestedBase):
+ """Top-level struct/array/map column evolution (works)."""
+
+ # -- C1: add a new struct top-level column ---------------------------
+
+ def test_add_struct_column(self):
+ s0 = pa.schema([('id', pa.int64()), ('val', pa.string())])
+ table = self._create('nested_add_struct', s0)
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 2], 'val': ['x', 'y']}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.nested_add_struct',
+ [SchemaChange.add_column('mv', _paimon_type(_MV_PA))], False)
+ table = self.catalog.get_table('default.nested_add_struct')
+ s1 = pa.schema([('id', pa.int64()), ('val', pa.string()), ('mv',
_MV_PA)])
+ self._write(table, pa.Table.from_pylist(
+ [{'id': 3, 'val': 'z',
+ 'mv': {'latest_version': 300, 'latest_value': 'c'}}], schema=s1))
+
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'val': 'x', 'mv': None},
+ {'id': 2, 'val': 'y', 'mv': None},
+ {'id': 3, 'val': 'z',
+ 'mv': {'latest_version': 300, 'latest_value': 'c'}}])
+
+ # -- C2: drop a struct top-level column ------------------------------
+
+ def test_drop_struct_column(self):
+ s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val',
pa.string())])
+ table = self._create('nested_drop_struct', s0)
+ self._write(table, pa.Table.from_pylist([
+ {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'},
'val': 'x'},
+ {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'},
'val': 'y'},
+ ], schema=s0))
+
+ self.catalog.alter_table(
+ 'default.nested_drop_struct', [SchemaChange.drop_column('mv')],
False)
+ table = self.catalog.get_table('default.nested_drop_struct')
+ s1 = pa.schema([('id', pa.int64()), ('val', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [3], 'val': ['z']}, schema=s1))
+
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'val': 'x'}, {'id': 2, 'val': 'y'}, {'id': 3, 'val':
'z'}])
+
+ # -- C4: nested leaf projection under evolution ----------------------
+
+ def test_nested_projection_after_add_column(self):
+ s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val',
pa.string())])
+ table = self._create('nested_proj_evo', s0)
+ self._write(table, pa.Table.from_pylist([
+ {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'},
'val': 'x'},
+ ], schema=s0))
+
+ self.catalog.alter_table(
+ 'default.nested_proj_evo',
+ [SchemaChange.add_column('extra', AtomicType('STRING'))], False)
+ table = self.catalog.get_table('default.nested_proj_evo')
+ s1 = pa.schema([('id', pa.int64()), ('mv', _MV_PA),
+ ('val', pa.string()), ('extra', pa.string())])
+ self._write(table, pa.Table.from_pylist([
+ {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'},
+ 'val': 'y', 'extra': 'E'},
+ ], schema=s1))
+
+ rows = self._read_sorted(
+ table, projection=['id', 'mv.latest_version', 'extra'])
+ self.assertEqual(rows, [
+ {'id': 1, 'mv_latest_version': 100, 'extra': None},
+ {'id': 2, 'mv_latest_version': 200, 'extra': 'E'}])
+
+ # -- C5: PK table struct column + add column merge read --------------
+
+ def test_pk_struct_add_column_merge(self):
+ s0 = pa.schema([pa.field('id', pa.int64(), nullable=False),
+ ('mv', _MV_PA)])
+ table = self._create('nested_pk_struct', s0,
+ primary_keys=['id'], bucket='1')
+ rows0 = [{'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}}]
+ self._write(table, pa.Table.from_pylist(rows0, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.nested_pk_struct',
+ [SchemaChange.add_column('w', AtomicType('STRING'))], False)
+ table = self.catalog.get_table('default.nested_pk_struct')
+ s1 = pa.schema([pa.field('id', pa.int64(), nullable=False),
+ ('mv', _MV_PA), ('w', pa.string())])
+ self._write(table, pa.Table.from_pylist(
+ [{'id': 1, 'mv': {'latest_version': 101, 'latest_value': 'a2'},
+ 'w': 'W'}], schema=s1))
+
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'mv': {'latest_version': 101, 'latest_value': 'a2'},
+ 'w': 'W'}])
+
+ # -- C6: array / map top-level column add ----------------------------
+
+ def test_add_array_and_map_columns(self):
+ s0 = pa.schema([('id', pa.int64()), ('val', pa.string())])
+ table = self._create('nested_add_arr_map', s0)
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'val': ['x']}, schema=s0))
+
+ arr_pa = pa.list_(pa.int64())
+ map_pa = pa.map_(pa.string(), pa.int64())
+ self.catalog.alter_table(
+ 'default.nested_add_arr_map', [
+ SchemaChange.add_column('arr', _paimon_type(arr_pa)),
+ SchemaChange.add_column('m', _paimon_type(map_pa)),
+ ], False)
+ table = self.catalog.get_table('default.nested_add_arr_map')
+ s1 = pa.schema([('id', pa.int64()), ('val', pa.string()),
+ ('arr', arr_pa), ('m', map_pa)])
+ self._write(table, pa.Table.from_pylist(
+ [{'id': 2, 'val': 'y', 'arr': [1, 2, 3],
+ 'm': [('k', 7)]}], schema=s1))
+
+ rows = self._read_sorted(table)
+ self.assertEqual(rows[0], {'id': 1, 'val': 'x', 'arr': None, 'm':
None})
+ self.assertEqual(rows[1]['arr'], [1, 2, 3])
+ self.assertEqual(dict(rows[1]['m']), {'k': 7})
+
+ # -- C3: rename a struct top-level column ----------------------------
+
+ def test_rename_struct_column(self):
+ # Renaming the top-level struct column keeps its field id, so old rows
+ # read their original struct back under the new name.
+ s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val',
pa.string())])
+ table = self._create('nested_rename_struct', s0)
+ self._write(table, pa.Table.from_pylist([
+ {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'},
'val': 'x'},
+ ], schema=s0))
+
+ self.catalog.alter_table(
+ 'default.nested_rename_struct',
+ [SchemaChange.rename_column('mv', 'mv2')], False)
+ table = self.catalog.get_table('default.nested_rename_struct')
+ s1 = pa.schema([('id', pa.int64()), ('mv2', _MV_PA), ('val',
pa.string())])
+ self._write(table, pa.Table.from_pylist([
+ {'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'},
'val': 'y'},
+ ], schema=s1))
+
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'mv2': {'latest_version': 100, 'latest_value': 'a'},
'val': 'x'},
+ {'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'},
'val': 'y'}])
+
+
+class SchemaEvolutionNestedGapTest(_NestedBase):
+ """Sub-field-level evolution inside a struct is NOT implemented.
+
+ schema_manager handles only the top-level ``field_names[-1]``; a dotted
+ path like ``['mv', 'latest_value']`` never recurses into the RowType.
+ These tests assert the current behaviour (silent top-level mutation, or
+ ColumnNotExistException) so the gap is documented.
+ """
+
+ def _create_struct_table(self, name):
+ s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val',
pa.string())])
+ table = self._create(name, s0)
+ self._write(table, pa.Table.from_pylist([
+ {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'},
'val': 'x'},
+ ], schema=s0))
+ return table
+
+ def _mv_subfield_names(self, table_name):
+ schema = self.catalog.get_table(
+ 'default.{}'.format(table_name)).table_schema
+ mv = next(f for f in schema.fields if f.name == 'mv')
+ return [sf.name for sf in mv.type.fields]
+
+ def _top_level_names(self, table_name):
+ schema = self.catalog.get_table(
+ 'default.{}'.format(table_name)).table_schema
+ return [f.name for f in schema.fields]
+
+ # -- C7: add nested sub-field -> silently adds a TOP-LEVEL column ----
+
+ def test_nested_add_subfield_mutates_top_level(self):
+ # GAP: add_column(['mv','new_inner']) does NOT add new_inner inside
+ # mv; it silently appends a top-level column 'new_inner' instead.
+ self._create_struct_table('gap_add')
+ self.catalog.alter_table(
+ 'default.gap_add',
+ [SchemaChange.add_column(['mv', 'new_inner'], AtomicType('INT'))],
+ False)
+ # mv's sub-fields are unchanged; a stray top-level column appeared.
+ self.assertEqual(self._mv_subfield_names('gap_add'),
+ ['latest_version', 'latest_value'])
+ self.assertIn('new_inner', self._top_level_names('gap_add'))
+
+ # -- C8/C9/C10: rename / update-type / drop nested sub-field ---------
+
+ def test_nested_rename_subfield_raises(self):
+ # GAP: field_names[-1]='latest_value' is looked up at the TOP level,
+ # where it does not exist -> ColumnNotExistException (wrapped).
+ self._create_struct_table('gap_rename')
+ with self.assertRaises(RuntimeError) as cm:
+ self.catalog.alter_table(
+ 'default.gap_rename',
+ [SchemaChange.rename_column(['mv', 'latest_value'], 'lv')],
False)
+ self.assertIn('latest_value', str(cm.exception))
+
+ def test_nested_update_subfield_type_raises(self):
+ self._create_struct_table('gap_update')
+ with self.assertRaises(RuntimeError) as cm:
+ self.catalog.alter_table(
+ 'default.gap_update',
+ [SchemaChange.update_column_type(
+ ['mv', 'latest_version'], AtomicType('BIGINT'))], False)
+ self.assertIn('latest_version', str(cm.exception))
+
+ def test_nested_drop_subfield_raises(self):
+ self._create_struct_table('gap_drop')
+ with self.assertRaises(RuntimeError) as cm:
+ self.catalog.alter_table(
+ 'default.gap_drop',
+ [SchemaChange.drop_column(['mv', 'latest_value'])], False)
+ self.assertIn('latest_value', str(cm.exception))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/schema_evolution_pk_read_test.py
b/paimon-python/pypaimon/tests/schema_evolution_pk_read_test.py
new file mode 100644
index 0000000000..4df02bb33c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/schema_evolution_pk_read_test.py
@@ -0,0 +1,335 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Schema-evolution read tests for primary-key tables (merge engines).
+
+Each test creates a PK table (``bucket=1`` so same-PK rows share a bucket
+and the read goes through the merge reader), writes a batch under the
+original schema, evolves the schema via ``catalog.alter_table``, writes a
+second batch under the new schema, and reads back the merged result that
+spans both schema versions.
+
+Covers add / drop / type promotion / position as well as the field-id
+isolation cases (rename, drop-then-readd same name) that a name-based
+alignment would get wrong -- the merged result must follow field id.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.data_types import AtomicType
+from pypaimon.schema.schema_change import Move, SchemaChange
+
+
+class SchemaEvolutionPkReadTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create(self, name, pa_schema, merge_engine='deduplicate',
+ primary_keys=('id',), partition_keys=None, extra=None):
+ options = {'bucket': '1', 'merge-engine': merge_engine}
+ if extra:
+ options.update(extra)
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=list(primary_keys),
+ partition_keys=list(partition_keys) if partition_keys else None,
+ options=options,
+ )
+ full = 'default.{}'.format(name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full)
+
+ def _write(self, table, pa_table):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa_table)
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ def _read_sorted(self, table, key='id'):
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ rows = rb.new_read().to_arrow(splits).to_pylist() if splits else []
+ return sorted(rows, key=lambda r: r[key])
+
+ @staticmethod
+ def _pk(name, arrow_type, nullable=True):
+ return pa.field(name, arrow_type, nullable=nullable)
+
+ # -- B1: add column + deduplicate ------------------------------------
+
+ def test_pk_add_column_deduplicate(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v', pa.string())])
+ table = self._create('pk_add_dedup', s0, 'deduplicate')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 2], 'v': ['a', 'b']}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_add_dedup',
+ [SchemaChange.add_column('w', AtomicType('STRING'))], False)
+ table = self.catalog.get_table('default.pk_add_dedup')
+ s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v', pa.string()), ('w', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 3], 'v': ['a2', 'c'], 'w': ['w1', 'w3']}, schema=s1))
+
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'v': 'a2', 'w': 'w1'}, # latest wins
+ {'id': 2, 'v': 'b', 'w': None}, # old file, w padded NULL
+ {'id': 3, 'v': 'c', 'w': 'w3'}])
+
+ # -- B2: add column + partial-update ---------------------------------
+
+ def test_pk_add_column_partial_update(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string())])
+ table = self._create('pk_add_partial', s0, 'partial-update')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'a': ['A']}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_add_partial',
+ [SchemaChange.add_column('b', AtomicType('STRING'))], False)
+ table = self.catalog.get_table('default.pk_add_partial')
+ s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string()), ('b', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'a': [None], 'b': ['B']}, schema=s1))
+
+ # partial-update merges non-null per field across the two versions.
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'a': 'A', 'b': 'B'}])
+
+ # -- B3: type promotion on a value column + deduplicate --------------
+
+ def test_pk_type_promotion_deduplicate(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('n', pa.int32())])
+ table = self._create('pk_promote', s0, 'deduplicate')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 2], 'n': [10, 20]}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_promote',
+ [SchemaChange.update_column_type('n', AtomicType('BIGINT'))],
False)
+ table = self.catalog.get_table('default.pk_promote')
+ s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('n', pa.int64())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 3], 'n': [100, 30]}, schema=s1))
+
+ rows = self._read_sorted(table)
+ self.assertEqual(rows, [
+ {'id': 1, 'n': 100}, {'id': 2, 'n': 20}, {'id': 3, 'n': 30}])
+
+ # -- B5: drop a value column + partial-update ------------------------
+
+ def test_pk_drop_column_partial_update(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string()), ('b', pa.string())])
+ table = self._create('pk_drop_partial', s0, 'partial-update')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'a': ['A'], 'b': [None]}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_drop_partial', [SchemaChange.drop_column('b')], False)
+ table = self.catalog.get_table('default.pk_drop_partial')
+ s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'a': ['A2']}, schema=s1))
+
+ self.assertEqual(self._read_sorted(table), [{'id': 1, 'a': 'A2'}])
+
+ # -- B6: reorder a value column (position) + deduplicate -------------
+
+ def test_pk_column_position_deduplicate(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string()), ('b', pa.string())])
+ table = self._create('pk_position', s0, 'deduplicate')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'a': ['a1'], 'b': ['b1']}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_position',
+ [SchemaChange.update_column_position(Move.first('b'))], False)
+ table = self.catalog.get_table('default.pk_position')
+ s1 = pa.schema([('b', pa.string()),
+ self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'b': ['b2'], 'id': [1], 'a': ['a2']}, schema=s1))
+
+ self.assertEqual(self._read_sorted(table), [
+ {'b': 'b2', 'id': 1, 'a': 'a2'}])
+
+ # -- B7: first-row engine + add column -------------------------------
+
+ def test_pk_first_row_add_column(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v', pa.string())])
+ table = self._create('pk_first_row', s0, 'first-row')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'v': ['first']}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_first_row',
+ [SchemaChange.add_column('w', AtomicType('STRING'))], False)
+ table = self.catalog.get_table('default.pk_first_row')
+ s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v', pa.string()), ('w', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'v': ['second'], 'w': ['W']}, schema=s1))
+
+ # first-row keeps the earliest row; it predates column w, so w is NULL.
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'v': 'first', 'w': None}])
+
+ # -- B8: multi-version chain (add + promotion) + partial-update ------
+
+ def test_pk_multi_version_chain_partial_update(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string()), ('n', pa.int32())])
+ table = self._create('pk_chain', s0, 'partial-update')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'a': ['A'], 'n': [10]}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_chain',
+ [SchemaChange.add_column('b', AtomicType('STRING'))], False)
+ table = self.catalog.get_table('default.pk_chain')
+ s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string()), ('n', pa.int32()), ('b',
pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'a': [None], 'n': [None], 'b': ['B']}, schema=s1))
+
+ self.catalog.alter_table(
+ 'default.pk_chain',
+ [SchemaChange.update_column_type('n', AtomicType('BIGINT'))],
False)
+ table = self.catalog.get_table('default.pk_chain')
+ s2 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('a', pa.string()), ('n', pa.int64()), ('b',
pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1], 'a': [None], 'n': [99], 'b': [None]}, schema=s2))
+
+ # partial-update across three schema versions: a from v0, b from v1,
+ # n (promoted) latest non-null from v2.
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'a': 'A', 'n': 99, 'b': 'B'}])
+
+ # -- B9 / B10: negative constraints ----------------------------------
+
+ def test_pk_cannot_update_primary_key_type(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v', pa.string())])
+ self._create('pk_no_pk_type', s0, 'deduplicate')
+ with self.assertRaises(RuntimeError) as cm:
+ self.catalog.alter_table(
+ 'default.pk_no_pk_type',
+ [SchemaChange.update_column_type('id', AtomicType('BIGINT'))],
+ False)
+ self.assertIn('primary key', str(cm.exception))
+
+ def test_pk_cannot_drop_or_rename_partition_key(self):
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ self._pk('dt', pa.string(), nullable=False),
+ ('v', pa.string())])
+ # PK must contain the partition key.
+ self._create('pk_part_guard', s0, 'deduplicate',
+ primary_keys=('id', 'dt'), partition_keys=('dt',))
+ with self.assertRaises(RuntimeError) as cm:
+ self.catalog.alter_table(
+ 'default.pk_part_guard', [SchemaChange.drop_column('dt')],
False)
+ self.assertIn('partition', str(cm.exception).lower())
+ with self.assertRaises(RuntimeError):
+ self.catalog.alter_table(
+ 'default.pk_part_guard',
+ [SchemaChange.rename_column('dt', 'dt2')], False)
+
+ # -- B4: rename a value column ---------------------------------------
+
+ def test_pk_rename_value_column(self):
+ # After rename, the old file's value (written under the old name) must
+ # still merge in under the new name -- alignment follows field id, so
+ # id=2 keeps its old 'b'.
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v', pa.string())])
+ table = self._create('pk_rename', s0, 'deduplicate')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 2], 'v': ['a', 'b']}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_rename',
+ [SchemaChange.rename_column('v', 'v2')], False)
+ table = self.catalog.get_table('default.pk_rename')
+ s1 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v2', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 3], 'v2': ['a2', 'c']}, schema=s1))
+
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'v2': 'a2'}, {'id': 2, 'v2': 'b'}, {'id': 3, 'v2': 'c'}])
+
+ # -- B11: drop a value column then re-add the same name --------------
+
+ def test_pk_drop_then_readd_same_name(self):
+ # The re-added v (new field id) must not revive the dropped column's
+ # old data: aligned by field id, id=2 (only in the old file) reads
+ # NULL for the new v.
+ s0 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v', pa.string())])
+ table = self._create('pk_drop_readd', s0, 'deduplicate')
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 2], 'v': ['old1', 'old2']}, schema=s0))
+
+ self.catalog.alter_table(
+ 'default.pk_drop_readd', [SchemaChange.drop_column('v')], False)
+ self.catalog.alter_table(
+ 'default.pk_drop_readd',
+ [SchemaChange.add_column('v', AtomicType('STRING'))], False)
+ table = self.catalog.get_table('default.pk_drop_readd')
+ s2 = pa.schema([self._pk('id', pa.int64(), nullable=False),
+ ('v', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'id': [1, 3], 'v': ['new1', 'new3']}, schema=s2))
+
+ self.assertEqual(self._read_sorted(table), [
+ {'id': 1, 'v': 'new1'}, {'id': 2, 'v': None}, {'id': 3, 'v':
'new3'}])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index 0bc5c9f142..febbf8c4d8 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -26,7 +26,7 @@ import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.schema.data_types import AtomicType
-from pypaimon.schema.schema_change import SchemaChange
+from pypaimon.schema.schema_change import Move, SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.table_schema import TableSchema
@@ -546,6 +546,293 @@ class SchemaEvolutionReadTest(unittest.TestCase):
}, schema=pa_schema)
self.assertEqual(expected, actual)
+ # ------------------------------------------------------------------
+ # Public-API end-to-end evolution cases (create -> write -> alter_table
+ # -> write -> read), the范式 promoted by
+ # test_schema_evolution_type_promotion_unpartitioned. Unlike the older
+ # white-box cases above (manual schema-N files + file.schema_id), these
+ # drive evolution purely through catalog.alter_table.
+ # ------------------------------------------------------------------
+
+ def _write(self, table, pa_table):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ def _read_rows(self, table, sort_key='k'):
+ read_builder = table.new_read_builder()
+ rows = read_builder.new_read().to_arrow(
+ self._scan_table(read_builder)).to_pylist()
+ return sorted(rows, key=lambda r: (r[sort_key] is None, r[sort_key]))
+
+ def _read_arrow(self, table, projection=None):
+ read_builder = table.new_read_builder()
+ if projection is not None:
+ read_builder = read_builder.with_projection(projection)
+ return read_builder.new_read().to_arrow(self._scan_table(read_builder))
+
+ def test_evolution_drop_column_then_read(self):
+ # A5: a column dropped after old data was written must disappear from
+ # the read; the old file's value for it is discarded, not surfaced.
+ name = 'default.evo_drop'
+ s0 = pa.schema([('k', pa.int64()), ('v', pa.string()), ('w',
pa.string())])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1, 2], 'v': ['a', 'b'], 'w': ['p', 'q']}, schema=s0))
+
+ self.catalog.alter_table(name, [SchemaChange.drop_column('v')], False)
+ table = self.catalog.get_table(name)
+ s1 = pa.schema([('k', pa.int64()), ('w', pa.string())])
+ self._write(table, pa.Table.from_pydict({'k': [3], 'w': ['r']},
schema=s1))
+
+ self.assertEqual(self._read_rows(table), [
+ {'k': 1, 'w': 'p'}, {'k': 2, 'w': 'q'}, {'k': 3, 'w': 'r'}])
+
+ def test_evolution_rename_column_then_read(self):
+ # A6: after rename the old file (written under the old name) and the
+ # new file (new name) must read back as the SAME logical column,
+ # matched by field id, not by name.
+ name = 'default.evo_rename'
+ s0 = pa.schema([('k', pa.int64()), ('v', pa.string())])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1, 2], 'v': ['a', 'b']}, schema=s0))
+
+ self.catalog.alter_table(
+ name, [SchemaChange.rename_column('v', 'renamed')], False)
+ table = self.catalog.get_table(name)
+ s1 = pa.schema([('k', pa.int64()), ('renamed', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'k': [3], 'renamed': ['c']}, schema=s1))
+
+ self.assertEqual(self._read_rows(table), [
+ {'k': 1, 'renamed': 'a'},
+ {'k': 2, 'renamed': 'b'},
+ {'k': 3, 'renamed': 'c'}])
+
+ def test_evolution_column_position_then_read(self):
+ # A7: moving a column to FIRST must reorder the read schema; the old
+ # file (original order) must be remapped so values stay with their
+ # column.
+ name = 'default.evo_position'
+ s0 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('b',
pa.string())])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1], 'a': ['a1'], 'b': ['b1']}, schema=s0))
+
+ self.catalog.alter_table(
+ name, [SchemaChange.update_column_position(Move.first('b'))],
False)
+ table = self.catalog.get_table(name)
+ s1 = pa.schema([('b', pa.string()), ('k', pa.int64()), ('a',
pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'b': ['b2'], 'k': [2], 'a': ['a2']}, schema=s1))
+
+ arrow = self._read_arrow(table)
+ self.assertEqual(arrow.column_names, ['b', 'k', 'a'])
+ self.assertEqual(self._read_rows(table), [
+ {'b': 'b1', 'k': 1, 'a': 'a1'},
+ {'b': 'b2', 'k': 2, 'a': 'a2'}])
+
+ def test_evolution_multi_version_chain(self):
+ # A8: chain schema versions (add + type promotion), writing one batch
+ # per version then reading across all of them. Rename across versions
+ # is covered on its own by A6; this case isolates the add+promotion
+ # chain.
+ name = 'default.evo_chain'
+ s0 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('n',
pa.int32())])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1], 'a': ['a1'], 'n': [10]}, schema=s0))
+
+ # v1: add column b
+ self.catalog.alter_table(
+ name, [SchemaChange.add_column('b', AtomicType('STRING'))], False)
+ table = self.catalog.get_table(name)
+ s1 = pa.schema([('k', pa.int64()), ('a', pa.string()),
+ ('n', pa.int32()), ('b', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'k': [2], 'a': ['a2'], 'n': [20], 'b': ['b2']}, schema=s1))
+
+ # v2: promote n int32 -> int64
+ self.catalog.alter_table(
+ name, [SchemaChange.update_column_type('n', AtomicType('BIGINT'))],
+ False)
+ table = self.catalog.get_table(name)
+ s2 = pa.schema([('k', pa.int64()), ('a', pa.string()),
+ ('n', pa.int64()), ('b', pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'k': [3], 'a': ['a3'], 'n': [30], 'b': ['b3']}, schema=s2))
+
+ # v3: add column d
+ self.catalog.alter_table(
+ name, [SchemaChange.add_column('d', AtomicType('STRING'))], False)
+ table = self.catalog.get_table(name)
+ s3 = pa.schema([('k', pa.int64()), ('a', pa.string()),
+ ('n', pa.int64()), ('b', pa.string()), ('d',
pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'k': [4], 'a': ['a4'], 'n': [40], 'b': ['b4'], 'd': ['d4']},
+ schema=s3))
+
+ self.assertEqual(self._read_rows(table), [
+ {'k': 1, 'a': 'a1', 'n': 10, 'b': None, 'd': None},
+ {'k': 2, 'a': 'a2', 'n': 20, 'b': 'b2', 'd': None},
+ {'k': 3, 'a': 'a3', 'n': 30, 'b': 'b3', 'd': None},
+ {'k': 4, 'a': 'a4', 'n': 40, 'b': 'b4', 'd': 'd4'}])
+
+ def test_evolution_projection_after_add_column(self):
+ # A10: projecting a subset that includes the newly added column must
+ # return NULL for that column on rows from old files.
+ name = 'default.evo_projection'
+ s0 = pa.schema([('k', pa.int64()), ('a', pa.string())])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1, 2], 'a': ['a1', 'a2']}, schema=s0))
+
+ self.catalog.alter_table(
+ name, [SchemaChange.add_column('b', AtomicType('STRING'))], False)
+ table = self.catalog.get_table(name)
+ s1 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('b',
pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'k': [3], 'a': ['a3'], 'b': ['b3']}, schema=s1))
+
+ rows = sorted(
+ self._read_arrow(table, projection=['k', 'b']).to_pylist(),
+ key=lambda r: r['k'])
+ self.assertEqual(rows, [
+ {'k': 1, 'b': None}, {'k': 2, 'b': None}, {'k': 3, 'b': 'b3'}])
+
+ def test_evolution_nullability_then_read(self):
+ # A11: relaxing NOT NULL -> nullable must let later NULLs read back
+ # while the old (non-null) file still reads correctly.
+ name = 'default.evo_nullability'
+ s0 = pa.schema([('k', pa.int64()),
+ pa.field('v', pa.string(), nullable=False)])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1, 2], 'v': ['a', 'b']}, schema=s0))
+
+ self.catalog.alter_table(
+ name, [SchemaChange.update_column_nullability('v', True)], False)
+ table = self.catalog.get_table(name)
+ s1 = pa.schema([('k', pa.int64()), ('v', pa.string())])
+ self._write(table, pa.Table.from_pydict({'k': [3], 'v': [None]},
schema=s1))
+
+ self.assertEqual(self._read_rows(table), [
+ {'k': 1, 'v': 'a'}, {'k': 2, 'v': 'b'}, {'k': 3, 'v': None}])
+
+ def test_evolution_drop_then_readd_same_name(self):
+ # A13 (name-vs-field-id isolation): dropping v then adding a new v
+ # (fresh field id) must NOT revive the old file's v. Because columns
+ # are aligned by field id, the dropped v's id is gone and the re-added
+ # v's new id is absent from old files, so old rows read NULL. Covers
+ # same-type and changed-type re-add.
+ for label, readd_paimon, readd_arrow, new_val in [
+ ('same_type', 'STRING', pa.string(), ['new3']),
+ ('changed_type', 'INT', pa.int32(), [99]),
+ ]:
+ with self.subTest(case=label):
+ name = 'default.evo_drop_readd_{}'.format(label)
+ s0 = pa.schema([('k', pa.int64()), ('v', pa.string())])
+ self.catalog.create_table(
+ name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1, 2], 'v': ['old1', 'old2']}, schema=s0))
+
+ self.catalog.alter_table(
+ name, [SchemaChange.drop_column('v')], False)
+ self.catalog.alter_table(
+ name, [SchemaChange.add_column('v',
AtomicType(readd_paimon))],
+ False)
+ table = self.catalog.get_table(name)
+ s2 = pa.schema([('k', pa.int64()), ('v', readd_arrow)])
+ self._write(table, pa.Table.from_pydict(
+ {'k': [3], 'v': new_val}, schema=s2))
+
+ self.assertEqual(self._read_rows(table), [
+ {'k': 1, 'v': None},
+ {'k': 2, 'v': None},
+ {'k': 3, 'v': new_val[0]}])
+
+ def test_evolution_rename_then_add_same_name(self):
+ # A14 (name-vs-field-id isolation): rename a->b, then add a new a
+ # (fresh field id). Aligned by field id, reading old files b (old id)
+ # carries the old a-data and the new a (new id) reads NULL -- a
+ # name-based alignment would instead feed the old physical column 'a'
+ # to the new a and leave b empty.
+ name = 'default.evo_rename_readd'
+ s0 = pa.schema([('k', pa.int64()), ('a', pa.string())])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1, 2], 'a': ['old1', 'old2']}, schema=s0))
+
+ self.catalog.alter_table(
+ name, [SchemaChange.rename_column('a', 'b')], False)
+ self.catalog.alter_table(
+ name, [SchemaChange.add_column('a', AtomicType('STRING'))], False)
+ table = self.catalog.get_table(name)
+ s2 = pa.schema([('k', pa.int64()), ('b', pa.string()), ('a',
pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'k': [3], 'b': ['B3'], 'a': ['A3']}, schema=s2))
+
+ self.assertEqual(self._read_rows(table), [
+ {'k': 1, 'b': 'old1', 'a': None},
+ {'k': 2, 'b': 'old2', 'a': None},
+ {'k': 3, 'b': 'B3', 'a': 'A3'}])
+
+ def test_evolution_column_swap_direct_rejected(self):
+ # A15a: a one-shot name swap is rejected -- renaming a->b while b
+ # still exists collides on the existing name. (Constraint check, this
+ # behaviour is correct.)
+ name = 'default.evo_swap_direct'
+ s0 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('b',
pa.string())])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ # alter_table wraps the underlying ColumnAlreadyExistException in a
+ # RuntimeError (filesystem_catalog catch-all), so match on that.
+ with self.assertRaises(RuntimeError) as cm:
+ self.catalog.alter_table(name, [
+ SchemaChange.rename_column('a', 'b'),
+ SchemaChange.rename_column('b', 'a'),
+ ], False)
+ self.assertIn('already exists', str(cm.exception))
+
+ def test_evolution_column_swap_via_temp_name(self):
+ # A15b (name-vs-field-id isolation): a 3-step swap via a temp name
+ # keeps field ids stable, so on read the old data follows the id, not
+ # the name -- old 'a1' (id of a) ends up under column b, old 'b1'
+ # under column a. A name-based alignment would drop/misalign both.
+ name = 'default.evo_swap_temp'
+ s0 = pa.schema([('k', pa.int64()), ('a', pa.string()), ('b',
pa.string())])
+ self.catalog.create_table(name, Schema.from_pyarrow_schema(s0), False)
+ table = self.catalog.get_table(name)
+ self._write(table, pa.Table.from_pydict(
+ {'k': [1], 'a': ['a1'], 'b': ['b1']}, schema=s0))
+
+ self.catalog.alter_table(name, [
+ SchemaChange.rename_column('a', '__tmp'),
+ SchemaChange.rename_column('b', 'a'),
+ SchemaChange.rename_column('__tmp', 'b'),
+ ], False)
+ table = self.catalog.get_table(name)
+ s1 = pa.schema([('k', pa.int64()), ('b', pa.string()), ('a',
pa.string())])
+ self._write(table, pa.Table.from_pydict(
+ {'k': [2], 'b': ['B2'], 'a': ['A2']}, schema=s1))
+
+ self.assertEqual(self._read_rows(table), [
+ {'k': 1, 'b': 'a1', 'a': 'b1'},
+ {'k': 2, 'b': 'B2', 'a': 'A2'}])
+
def _write_test_table(self, table):
write_builder = table.new_batch_write_builder()