This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 267f5e62ce [Python] Fix data evolution read IndexError when file has
no write_cols (#7556)
267f5e62ce is described below
commit 267f5e62ce5bd930502924baeea75e9397ee079e
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Mar 30 20:40:57 2026 +0800
[Python] Fix data evolution read IndexError when file has no write_cols
(#7556)
When a data file has no write_cols (written with all columns before
schema evolution), `_create_union_reader` wrongly assumes it contains
all current table fields including columns added later, causing
IndexError on read. This PR fixes it.
---
paimon-python/pypaimon/read/split_read.py | 9 ++-
.../pypaimon/tests/data_evolution_test.py | 81 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 2 deletions(-)
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 41904267f7..ea53c3cf96 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -613,8 +613,13 @@ class DataEvolutionSplitRead(SplitRead):
elif first_file.write_cols:
field_ids =
self._get_field_ids_from_write_cols(first_file.write_cols)
else:
- # For regular files, get all field IDs from the schema
- field_ids = [field.id for field in self.table.fields]
+ # For regular files without write_cols, derive field IDs from
+ # the file's schema version, not the current table schema.
+ # The file only contains columns from when it was written.
+ file_schema =
self.table.schema_manager.get_schema(first_file.schema_id)
+ field_ids = [field.id for field in file_schema.fields]
+ field_ids.append(SpecialFields.ROW_ID.id)
+ field_ids.append(SpecialFields.SEQUENCE_NUMBER.id)
read_fields = []
for j, read_field_id in enumerate(read_field_index):
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 796625f6a4..266041eec5 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -1356,3 +1356,84 @@ class DataEvolutionTest(unittest.TestCase):
rebuilt = pa.RecordBatch.from_arrays(arrays, names=batch.schema.names)
self.assertTrue(rebuilt.schema.field('_ROW_ID').nullable)
self.assertTrue(rebuilt.schema.field('_SEQUENCE_NUMBER').nullable)
+
+ def test_read_full_schema_on_write_before_evolution(self):
+ from pypaimon.schema.schema_change import SchemaChange
+ from pypaimon.schema.data_types import AtomicType
+
+ # Step 1: Create table with [f0, f1]
+ initial_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ initial_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'},
+ )
+ table_name = 'default.test_no_write_cols_schema_evo'
+ self.catalog.create_table(table_name, schema, False)
+ table = self.catalog.get_table(table_name)
+
+ # Step 2: Write data with ALL columns of old schema → write_cols = None
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(pa.Table.from_pydict(
+ {'f0': [1, 2], 'f1': ['a', 'b']},
+ schema=initial_schema,
+ ))
+ cmts = table_write.prepare_commit()
+ for c in cmts:
+ for nf in c.new_files:
+ self.assertIsNone(nf.write_cols)
+ table_commit.commit(cmts)
+ table_write.close()
+ table_commit.close()
+
+ # Step 3: Schema evolution - add column f2
+ self.catalog.alter_table(
+ table_name,
+ [SchemaChange.add_column('f2', AtomicType('STRING'))],
+ )
+ table = self.catalog.get_table(table_name)
+
+ # Step 4: Write f2 only for same rows (first_row_id = 0)
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write().with_write_type(['f2'])
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(pa.Table.from_pydict(
+ {'f2': ['x', 'y']},
+ schema=pa.schema([('f2', pa.string())]),
+ ))
+ cmts_f2 = table_write.prepare_commit()
+ for c in cmts_f2:
+ for nf in c.new_files:
+ nf.first_row_id = 0
+ table_commit.commit(cmts_f2)
+ table_write.close()
+ table_commit.close()
+
+ # Step 5: Read all columns
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+
+ for split in splits:
+ for f in split.files:
+ if f.write_cols is None:
+ f.max_sequence_number = 999999
+
+ actual = table_read.to_arrow(splits)
+
+ expect = pa.Table.from_pydict({
+ 'f0': [1, 2],
+ 'f1': ['a', 'b'],
+ 'f2': ['x', 'y'],
+ }, schema=pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ]))
+ self.assertEqual(actual.num_rows, 2)
+ self.assertEqual(actual, expect)