This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 29561ea43a [python] Fix partial update for normal column on
blob/vector tables (#8147)
29561ea43a is described below
commit 29561ea43acac5878adc361ded2071a46f994fd1
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Jun 7 22:53:08 2026 +0800
[python] Fix partial update for normal column on blob/vector tables (#8147)
---
paimon-python/pypaimon/tests/blob_table_test.py | 128 ++++++++++++++++++++
.../tests/ray_data_evolution_merge_into_test.py | 1 -
paimon-python/pypaimon/tests/vector_table_test.py | 131 +++++++++++++++++++++
.../pypaimon/write/writer/data_vector_writer.py | 6 +-
paimon-python/pypaimon/write/writer/data_writer.py | 10 ++
.../write/writer/dedicated_format_writer.py | 6 +-
6 files changed, 276 insertions(+), 6 deletions(-)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 7619fcb6de..e3af63ad52 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -3804,6 +3804,134 @@ class DedicatedFormatWriterTest(unittest.TestCase):
table = self.catalog.get_table('test_db.nested_blob_name_no_error')
self.assertIsNotNone(table)
+ def test_blob_table_partial_update_non_blob_column(self):
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('picture', pa.large_binary()),
+ ])
+ opts = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ }
+ s = Schema.from_pyarrow_schema(pa_schema, options=opts)
+ table_name = 'test_db.blob_de_seq'
+ self.catalog.create_table(table_name, s, False)
+
+ table = self.catalog.get_table(table_name)
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ w.write_arrow(pa.Table.from_pydict(
+ {'id': [1, 2], 'name': ['a', 'b'], 'picture': [None, None]},
+ schema=pa_schema,
+ ))
+ wb.new_commit().commit(w.prepare_commit())
+ w.close()
+
+ from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+ from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+ table = self.catalog.get_table(table_name)
+ rb = table.new_read_builder()
+ rb = rb.with_projection(['name', '_ROW_ID'])
+ splits = rb.new_scan().plan().splits()
+ source = rb.new_read().to_arrow(splits)
+
+ update_data = pa.table({
+ '_ROW_ID': source.column('_ROW_ID'),
+ 'name': pa.array(['updated', 'updated'], type=pa.string()),
+ })
+ updater = TableUpdateByRowId(
+ table, '_test_', BATCH_COMMIT_IDENTIFIER,
+ )
+ msgs = updater.update_columns(update_data, ['name'])
+ table.new_batch_write_builder().new_commit().commit(msgs)
+
+ table = self.catalog.get_table(table_name)
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+ self.assertEqual(result['name'], ['updated', 'updated'])
+
+ def
test_blob_table_partial_update_non_blob_column_with_rolling_files(self):
+ from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+ from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+ from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('picture', pa.large_binary()),
+ ])
+ opts = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'target-file-size': '1KB',
+ }
+ s = Schema.from_pyarrow_schema(pa_schema, options=opts)
+ table_name = 'test_db.blob_de_seq_rolling'
+ self.catalog.create_table(table_name, s, False)
+
+ write_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
+ table = self.catalog.get_table(table_name)
+ wb = table.new_batch_write_builder()
+ w = wb.new_write().with_write_type(['id', 'name'])
+ for start in (0, 1000):
+ ids = list(range(start, start + 1000))
+ w.write_arrow(pa.Table.from_pydict(
+ {
+ 'id': ids,
+ 'name': [f'name_{i}_' + 'x' * 2048 for i in ids],
+ },
+ schema=write_schema,
+ ))
+ commit_messages = w.prepare_commit()
+ normal_files = [
+ f for msg in commit_messages for f in msg.new_files
+ if not DataFileMeta.is_blob_file(f.file_name)
+ and not DataFileMeta.is_vector_file(f.file_name)
+ ]
+ self.assertGreaterEqual(len(normal_files), 2)
+ for file in normal_files:
+ self.assertEqual(file.min_sequence_number, 0)
+ self.assertEqual(file.max_sequence_number, file.row_count - 1)
+ wb.new_commit().commit(commit_messages)
+ w.close()
+
+ table = self.catalog.get_table(table_name)
+ rb = table.new_read_builder().with_projection(['id', 'name',
'_ROW_ID'])
+ splits = rb.new_scan().plan().splits()
+ source = rb.new_read().to_arrow(splits).sort_by('id')
+
+ update_data = pa.table({
+ '_ROW_ID': source.column('_ROW_ID'),
+ 'name': pa.array(['updated'] * source.num_rows, type=pa.string()),
+ })
+ updater = TableUpdateByRowId(
+ table, '_test_', BATCH_COMMIT_IDENTIFIER,
+ )
+ msgs = updater.update_columns(update_data, ['name'])
+ update_normal_files = [
+ f for msg in msgs for f in msg.new_files
+ if not DataFileMeta.is_blob_file(f.file_name)
+ and not DataFileMeta.is_vector_file(f.file_name)
+ ]
+ self.assertGreaterEqual(len(update_normal_files), 2)
+ for file in update_normal_files:
+ self.assertEqual(file.min_sequence_number, 0)
+ self.assertEqual(file.max_sequence_number, file.row_count - 1)
+ table.new_batch_write_builder().new_commit().commit(msgs)
+
+ table = self.catalog.get_table(table_name)
+ rb = table.new_read_builder().with_projection(['id', 'name'])
+ splits = rb.new_scan().plan().splits()
+ result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+ self.assertEqual(result['id'], list(range(2000)))
+ self.assertEqual(result['name'], ['updated'] * 2000)
+
class GetBlobTest(unittest.TestCase):
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index 7e365009db..cd2745dc0f 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -1957,7 +1957,6 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
self.assertEqual(out['name'], ['old', 'young', 'senior'])
self.assertEqual(out['age'], [10, 20, 30])
- @unittest.skip("blocked by blob DE sequence bug fix, see PR #8147")
@unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
def test_self_merge_blob_source_condition(self):
blob_schema = pa.schema([
diff --git a/paimon-python/pypaimon/tests/vector_table_test.py
b/paimon-python/pypaimon/tests/vector_table_test.py
index 75a97f19a8..0f96455bbd 100644
--- a/paimon-python/pypaimon/tests/vector_table_test.py
+++ b/paimon-python/pypaimon/tests/vector_table_test.py
@@ -340,6 +340,137 @@ class VectorTableWriteReadTest(unittest.TestCase):
self.assertEqual(result.num_rows, 3)
+ def test_vector_table_partial_update_non_vector_column(self):
+ vector_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('embedding', pa.list_(pa.float32(), 4)),
+ ])
+ opts = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'vector.file.format': 'parquet',
+ }
+ s = Schema.from_pyarrow_schema(vector_schema, options=opts)
+ table_name = 'test_db.vector_de_seq'
+ self.catalog.create_table(table_name, s, False)
+
+ table = self.catalog.get_table(table_name)
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ w.write_arrow(pa.Table.from_pydict(
+ {
+ 'id': [1, 2],
+ 'name': ['a', 'b'],
+ 'embedding': [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]],
+ },
+ schema=vector_schema,
+ ))
+ wb.new_commit().commit(w.prepare_commit())
+ w.close()
+
+ from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+ from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+ table = self.catalog.get_table(table_name)
+ rb = table.new_read_builder()
+ rb = rb.with_projection(['name', '_ROW_ID'])
+ splits = rb.new_scan().plan().splits()
+ source = rb.new_read().to_arrow(splits)
+
+ update_data = pa.table({
+ '_ROW_ID': source.column('_ROW_ID'),
+ 'name': pa.array(['updated', 'updated'], type=pa.string()),
+ })
+ updater = TableUpdateByRowId(
+ table, '_test_', BATCH_COMMIT_IDENTIFIER,
+ )
+ msgs = updater.update_columns(update_data, ['name'])
+ table.new_batch_write_builder().new_commit().commit(msgs)
+
+ table = self.catalog.get_table(table_name)
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+ self.assertEqual(result['name'], ['updated', 'updated'])
+
+ def
test_vector_table_partial_update_non_vector_column_with_rolling_files(self):
+ from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+ from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+ vector_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('embedding', pa.list_(pa.float32(), 4)),
+ ])
+ opts = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'vector.file.format': 'parquet',
+ 'target-file-size': '1KB',
+ }
+ s = Schema.from_pyarrow_schema(vector_schema, options=opts)
+ table_name = 'test_db.vector_de_seq_rolling'
+ self.catalog.create_table(table_name, s, False)
+
+ write_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
+ table = self.catalog.get_table(table_name)
+ wb = table.new_batch_write_builder()
+ w = wb.new_write().with_write_type(['id', 'name'])
+ for start in (0, 1000):
+ ids = list(range(start, start + 1000))
+ w.write_arrow(pa.Table.from_pydict(
+ {
+ 'id': ids,
+ 'name': [f'name_{i}_' + 'x' * 2048 for i in ids],
+ },
+ schema=write_schema,
+ ))
+ commit_messages = w.prepare_commit()
+ normal_files = [
+ f for msg in commit_messages for f in msg.new_files
+ if not DataFileMeta.is_vector_file(f.file_name)
+ ]
+ self.assertGreaterEqual(len(normal_files), 2)
+ for file in normal_files:
+ self.assertEqual(file.min_sequence_number, 0)
+ self.assertEqual(file.max_sequence_number, file.row_count - 1)
+ wb.new_commit().commit(commit_messages)
+ w.close()
+
+ table = self.catalog.get_table(table_name)
+ rb = table.new_read_builder().with_projection(['id', 'name',
'_ROW_ID'])
+ splits = rb.new_scan().plan().splits()
+ source = rb.new_read().to_arrow(splits).sort_by('id')
+
+ update_data = pa.table({
+ '_ROW_ID': source.column('_ROW_ID'),
+ 'name': pa.array(['updated'] * source.num_rows, type=pa.string()),
+ })
+ updater = TableUpdateByRowId(
+ table, '_test_', BATCH_COMMIT_IDENTIFIER,
+ )
+ msgs = updater.update_columns(update_data, ['name'])
+ update_normal_files = [
+ f for msg in msgs for f in msg.new_files
+ if not DataFileMeta.is_vector_file(f.file_name)
+ ]
+ self.assertGreaterEqual(len(update_normal_files), 2)
+ for file in update_normal_files:
+ self.assertEqual(file.min_sequence_number, 0)
+ self.assertEqual(file.max_sequence_number, file.row_count - 1)
+ table.new_batch_write_builder().new_commit().commit(msgs)
+
+ table = self.catalog.get_table(table_name)
+ rb = table.new_read_builder().with_projection(['id', 'name'])
+ splits = rb.new_scan().plan().splits()
+ result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+ self.assertEqual(result['id'], list(range(2000)))
+ self.assertEqual(result['name'], ['updated'] * 2000)
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py
b/paimon-python/pypaimon/write/writer/data_vector_writer.py
index d06d425936..e63a916ed4 100644
--- a/paimon-python/pypaimon/write/writer/data_vector_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py
@@ -226,6 +226,8 @@ class DataVectorWriter(DataWriter):
stats_columns = self.normal_columns if metadata_stats_enabled else []
value_stats = self._collect_value_stats(data, stats_columns)
+ min_seq, max_seq = self._append_file_sequence_range(data.num_rows)
+
return DataFileMeta.create(
file_name=file_name,
file_size=self.file_io.get_file_size(file_path),
@@ -234,8 +236,8 @@ class DataVectorWriter(DataWriter):
max_key=GenericRow([], []),
key_stats=SimpleStats.empty_stats(),
value_stats=value_stats,
- min_sequence_number=-1,
- max_sequence_number=-1,
+ min_sequence_number=min_seq,
+ max_sequence_number=max_seq,
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 9f10e8a64e..e237bf9f37 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -164,6 +164,16 @@ class DataWriter(ABC):
def _merge_data(self, existing_data: pa.RecordBatch, new_data:
pa.RecordBatch) -> pa.RecordBatch:
"""Merge existing data with new data. Must be implemented by
subclasses."""
+ def _append_file_sequence_range(self, row_count: int) -> Tuple[int, int]:
+ if row_count <= 0:
+ raise ValueError("row_count must be positive")
+
+ if self.options.data_evolution_enabled(False):
+ # Row-tracking commit stamps this sentinel range with the snapshot
id.
+ return 0, row_count - 1
+
+ return -1, -1
+
def _check_and_roll_if_needed(self):
while self.pending_data is not None:
current_size = self.pending_data.nbytes
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index 444e88332b..c223c1c62b 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -435,7 +435,7 @@ class DedicatedFormatWriter(DataWriter):
stats_columns = self.normal_columns if metadata_stats_enabled else []
value_stats = self._collect_value_stats(data, stats_columns)
- self.sequence_generator.start = self.sequence_generator.current
+ min_seq, max_seq = self._append_file_sequence_range(data.num_rows)
return DataFileMeta.create(
file_name=file_name,
@@ -445,8 +445,8 @@ class DedicatedFormatWriter(DataWriter):
max_key=GenericRow([], []),
key_stats=SimpleStats.empty_stats(),
value_stats=value_stats,
- min_sequence_number=-1,
- max_sequence_number=-1,
+ min_sequence_number=min_seq,
+ max_sequence_number=max_seq,
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],