This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 29561ea43a [python] Fix partial update for normal column on 
blob/vector tables (#8147)
29561ea43a is described below

commit 29561ea43acac5878adc361ded2071a46f994fd1
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Jun 7 22:53:08 2026 +0800

    [python] Fix partial update for normal column on blob/vector tables (#8147)
---
 paimon-python/pypaimon/tests/blob_table_test.py    | 128 ++++++++++++++++++++
 .../tests/ray_data_evolution_merge_into_test.py    |   1 -
 paimon-python/pypaimon/tests/vector_table_test.py  | 131 +++++++++++++++++++++
 .../pypaimon/write/writer/data_vector_writer.py    |   6 +-
 paimon-python/pypaimon/write/writer/data_writer.py |  10 ++
 .../write/writer/dedicated_format_writer.py        |   6 +-
 6 files changed, 276 insertions(+), 6 deletions(-)

diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 7619fcb6de..e3af63ad52 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -3804,6 +3804,134 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         table = self.catalog.get_table('test_db.nested_blob_name_no_error')
         self.assertIsNotNone(table)
 
+    def test_blob_table_partial_update_non_blob_column(self):
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('picture', pa.large_binary()),
+        ])
+        opts = {
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        }
+        s = Schema.from_pyarrow_schema(pa_schema, options=opts)
+        table_name = 'test_db.blob_de_seq'
+        self.catalog.create_table(table_name, s, False)
+
+        table = self.catalog.get_table(table_name)
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        w.write_arrow(pa.Table.from_pydict(
+            {'id': [1, 2], 'name': ['a', 'b'], 'picture': [None, None]},
+            schema=pa_schema,
+        ))
+        wb.new_commit().commit(w.prepare_commit())
+        w.close()
+
+        from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+        from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+        table = self.catalog.get_table(table_name)
+        rb = table.new_read_builder()
+        rb = rb.with_projection(['name', '_ROW_ID'])
+        splits = rb.new_scan().plan().splits()
+        source = rb.new_read().to_arrow(splits)
+
+        update_data = pa.table({
+            '_ROW_ID': source.column('_ROW_ID'),
+            'name': pa.array(['updated', 'updated'], type=pa.string()),
+        })
+        updater = TableUpdateByRowId(
+            table, '_test_', BATCH_COMMIT_IDENTIFIER,
+        )
+        msgs = updater.update_columns(update_data, ['name'])
+        table.new_batch_write_builder().new_commit().commit(msgs)
+
+        table = self.catalog.get_table(table_name)
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+        self.assertEqual(result['name'], ['updated', 'updated'])
+
+    def 
test_blob_table_partial_update_non_blob_column_with_rolling_files(self):
+        from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+        from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+        from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('picture', pa.large_binary()),
+        ])
+        opts = {
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'target-file-size': '1KB',
+        }
+        s = Schema.from_pyarrow_schema(pa_schema, options=opts)
+        table_name = 'test_db.blob_de_seq_rolling'
+        self.catalog.create_table(table_name, s, False)
+
+        write_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+        ])
+        table = self.catalog.get_table(table_name)
+        wb = table.new_batch_write_builder()
+        w = wb.new_write().with_write_type(['id', 'name'])
+        for start in (0, 1000):
+            ids = list(range(start, start + 1000))
+            w.write_arrow(pa.Table.from_pydict(
+                {
+                    'id': ids,
+                    'name': [f'name_{i}_' + 'x' * 2048 for i in ids],
+                },
+                schema=write_schema,
+            ))
+        commit_messages = w.prepare_commit()
+        normal_files = [
+            f for msg in commit_messages for f in msg.new_files
+            if not DataFileMeta.is_blob_file(f.file_name)
+            and not DataFileMeta.is_vector_file(f.file_name)
+        ]
+        self.assertGreaterEqual(len(normal_files), 2)
+        for file in normal_files:
+            self.assertEqual(file.min_sequence_number, 0)
+            self.assertEqual(file.max_sequence_number, file.row_count - 1)
+        wb.new_commit().commit(commit_messages)
+        w.close()
+
+        table = self.catalog.get_table(table_name)
+        rb = table.new_read_builder().with_projection(['id', 'name', 
'_ROW_ID'])
+        splits = rb.new_scan().plan().splits()
+        source = rb.new_read().to_arrow(splits).sort_by('id')
+
+        update_data = pa.table({
+            '_ROW_ID': source.column('_ROW_ID'),
+            'name': pa.array(['updated'] * source.num_rows, type=pa.string()),
+        })
+        updater = TableUpdateByRowId(
+            table, '_test_', BATCH_COMMIT_IDENTIFIER,
+        )
+        msgs = updater.update_columns(update_data, ['name'])
+        update_normal_files = [
+            f for msg in msgs for f in msg.new_files
+            if not DataFileMeta.is_blob_file(f.file_name)
+            and not DataFileMeta.is_vector_file(f.file_name)
+        ]
+        self.assertGreaterEqual(len(update_normal_files), 2)
+        for file in update_normal_files:
+            self.assertEqual(file.min_sequence_number, 0)
+            self.assertEqual(file.max_sequence_number, file.row_count - 1)
+        table.new_batch_write_builder().new_commit().commit(msgs)
+
+        table = self.catalog.get_table(table_name)
+        rb = table.new_read_builder().with_projection(['id', 'name'])
+        splits = rb.new_scan().plan().splits()
+        result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+        self.assertEqual(result['id'], list(range(2000)))
+        self.assertEqual(result['name'], ['updated'] * 2000)
+
 
 class GetBlobTest(unittest.TestCase):
 
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py 
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index 7e365009db..cd2745dc0f 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -1957,7 +1957,6 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
         self.assertEqual(out['name'], ['old', 'young', 'senior'])
         self.assertEqual(out['age'], [10, 20, 30])
 
-    @unittest.skip("blocked by blob DE sequence bug fix, see PR #8147")
     @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
     def test_self_merge_blob_source_condition(self):
         blob_schema = pa.schema([
diff --git a/paimon-python/pypaimon/tests/vector_table_test.py 
b/paimon-python/pypaimon/tests/vector_table_test.py
index 75a97f19a8..0f96455bbd 100644
--- a/paimon-python/pypaimon/tests/vector_table_test.py
+++ b/paimon-python/pypaimon/tests/vector_table_test.py
@@ -340,6 +340,137 @@ class VectorTableWriteReadTest(unittest.TestCase):
 
         self.assertEqual(result.num_rows, 3)
 
+    def test_vector_table_partial_update_non_vector_column(self):
+        vector_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('embedding', pa.list_(pa.float32(), 4)),
+        ])
+        opts = {
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'vector.file.format': 'parquet',
+        }
+        s = Schema.from_pyarrow_schema(vector_schema, options=opts)
+        table_name = 'test_db.vector_de_seq'
+        self.catalog.create_table(table_name, s, False)
+
+        table = self.catalog.get_table(table_name)
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        w.write_arrow(pa.Table.from_pydict(
+            {
+                'id': [1, 2],
+                'name': ['a', 'b'],
+                'embedding': [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]],
+            },
+            schema=vector_schema,
+        ))
+        wb.new_commit().commit(w.prepare_commit())
+        w.close()
+
+        from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+        from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+        table = self.catalog.get_table(table_name)
+        rb = table.new_read_builder()
+        rb = rb.with_projection(['name', '_ROW_ID'])
+        splits = rb.new_scan().plan().splits()
+        source = rb.new_read().to_arrow(splits)
+
+        update_data = pa.table({
+            '_ROW_ID': source.column('_ROW_ID'),
+            'name': pa.array(['updated', 'updated'], type=pa.string()),
+        })
+        updater = TableUpdateByRowId(
+            table, '_test_', BATCH_COMMIT_IDENTIFIER,
+        )
+        msgs = updater.update_columns(update_data, ['name'])
+        table.new_batch_write_builder().new_commit().commit(msgs)
+
+        table = self.catalog.get_table(table_name)
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+        self.assertEqual(result['name'], ['updated', 'updated'])
+
+    def 
test_vector_table_partial_update_non_vector_column_with_rolling_files(self):
+        from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+        from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+        vector_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('embedding', pa.list_(pa.float32(), 4)),
+        ])
+        opts = {
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'vector.file.format': 'parquet',
+            'target-file-size': '1KB',
+        }
+        s = Schema.from_pyarrow_schema(vector_schema, options=opts)
+        table_name = 'test_db.vector_de_seq_rolling'
+        self.catalog.create_table(table_name, s, False)
+
+        write_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+        ])
+        table = self.catalog.get_table(table_name)
+        wb = table.new_batch_write_builder()
+        w = wb.new_write().with_write_type(['id', 'name'])
+        for start in (0, 1000):
+            ids = list(range(start, start + 1000))
+            w.write_arrow(pa.Table.from_pydict(
+                {
+                    'id': ids,
+                    'name': [f'name_{i}_' + 'x' * 2048 for i in ids],
+                },
+                schema=write_schema,
+            ))
+        commit_messages = w.prepare_commit()
+        normal_files = [
+            f for msg in commit_messages for f in msg.new_files
+            if not DataFileMeta.is_vector_file(f.file_name)
+        ]
+        self.assertGreaterEqual(len(normal_files), 2)
+        for file in normal_files:
+            self.assertEqual(file.min_sequence_number, 0)
+            self.assertEqual(file.max_sequence_number, file.row_count - 1)
+        wb.new_commit().commit(commit_messages)
+        w.close()
+
+        table = self.catalog.get_table(table_name)
+        rb = table.new_read_builder().with_projection(['id', 'name', 
'_ROW_ID'])
+        splits = rb.new_scan().plan().splits()
+        source = rb.new_read().to_arrow(splits).sort_by('id')
+
+        update_data = pa.table({
+            '_ROW_ID': source.column('_ROW_ID'),
+            'name': pa.array(['updated'] * source.num_rows, type=pa.string()),
+        })
+        updater = TableUpdateByRowId(
+            table, '_test_', BATCH_COMMIT_IDENTIFIER,
+        )
+        msgs = updater.update_columns(update_data, ['name'])
+        update_normal_files = [
+            f for msg in msgs for f in msg.new_files
+            if not DataFileMeta.is_vector_file(f.file_name)
+        ]
+        self.assertGreaterEqual(len(update_normal_files), 2)
+        for file in update_normal_files:
+            self.assertEqual(file.min_sequence_number, 0)
+            self.assertEqual(file.max_sequence_number, file.row_count - 1)
+        table.new_batch_write_builder().new_commit().commit(msgs)
+
+        table = self.catalog.get_table(table_name)
+        rb = table.new_read_builder().with_projection(['id', 'name'])
+        splits = rb.new_scan().plan().splits()
+        result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+        self.assertEqual(result['id'], list(range(2000)))
+        self.assertEqual(result['name'], ['updated'] * 2000)
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py 
b/paimon-python/pypaimon/write/writer/data_vector_writer.py
index d06d425936..e63a916ed4 100644
--- a/paimon-python/pypaimon/write/writer/data_vector_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py
@@ -226,6 +226,8 @@ class DataVectorWriter(DataWriter):
         stats_columns = self.normal_columns if metadata_stats_enabled else []
         value_stats = self._collect_value_stats(data, stats_columns)
 
+        min_seq, max_seq = self._append_file_sequence_range(data.num_rows)
+
         return DataFileMeta.create(
             file_name=file_name,
             file_size=self.file_io.get_file_size(file_path),
@@ -234,8 +236,8 @@ class DataVectorWriter(DataWriter):
             max_key=GenericRow([], []),
             key_stats=SimpleStats.empty_stats(),
             value_stats=value_stats,
-            min_sequence_number=-1,
-            max_sequence_number=-1,
+            min_sequence_number=min_seq,
+            max_sequence_number=max_seq,
             schema_id=self.table.table_schema.id,
             level=0,
             extra_files=[],
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 9f10e8a64e..e237bf9f37 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -164,6 +164,16 @@ class DataWriter(ABC):
     def _merge_data(self, existing_data: pa.RecordBatch, new_data: 
pa.RecordBatch) -> pa.RecordBatch:
         """Merge existing data with new data. Must be implemented by 
subclasses."""
 
+    def _append_file_sequence_range(self, row_count: int) -> Tuple[int, int]:
+        if row_count <= 0:
+            raise ValueError("row_count must be positive")
+
+        if self.options.data_evolution_enabled(False):
+            # Row-tracking commit stamps this sentinel range with the snapshot 
id.
+            return 0, row_count - 1
+
+        return -1, -1
+
     def _check_and_roll_if_needed(self):
         while self.pending_data is not None:
             current_size = self.pending_data.nbytes
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py 
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index 444e88332b..c223c1c62b 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -435,7 +435,7 @@ class DedicatedFormatWriter(DataWriter):
         stats_columns = self.normal_columns if metadata_stats_enabled else []
         value_stats = self._collect_value_stats(data, stats_columns)
 
-        self.sequence_generator.start = self.sequence_generator.current
+        min_seq, max_seq = self._append_file_sequence_range(data.num_rows)
 
         return DataFileMeta.create(
             file_name=file_name,
@@ -445,8 +445,8 @@ class DedicatedFormatWriter(DataWriter):
             max_key=GenericRow([], []),
             key_stats=SimpleStats.empty_stats(),
             value_stats=value_stats,
-            min_sequence_number=-1,
-            max_sequence_number=-1,
+            min_sequence_number=min_seq,
+            max_sequence_number=max_seq,
             schema_id=self.table.table_schema.id,
             level=0,
             extra_files=[],

Reply via email to