This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 38022feae0 [python] Fix blob column updates with rolling files (#8156)
38022feae0 is described below

commit 38022feae0daa1dcd8dd2dd2aa449083dc6a13e0
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jun 8 18:58:46 2026 +0800

    [python] Fix blob column updates with rolling files (#8156)
    
    Before this PR, all rolled blob update files were assigned the same
    `first_row_id`. As a result, valid BLOB column updates could fail during
    commit with `Row ID existence conflict`, because the dedicated blob
    files no longer described the correct row-id ranges.
    
    This PR assigns proper contiguous row-id ranges to rolled blob files and
    relaxes row-id existence checking for dedicated BLOB files to validate
    range coverage by normal data files.
---
 paimon-python/pypaimon/tests/blob_table_test.py    | 170 +++++++++++++++++++++
 .../tests/write/conflict_detection_test.py         |  36 +++++
 .../pypaimon/write/commit/conflict_detection.py    |  14 ++
 .../pypaimon/write/table_update_by_row_id.py       |  43 +++++-
 paimon-python/pypaimon/write/writer/blob_writer.py |   2 +-
 5 files changed, 261 insertions(+), 4 deletions(-)

diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index e3af63ad52..9d87716a08 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1135,6 +1135,176 @@ class DedicatedFormatWriterTest(unittest.TestCase):
             3: b'blob-3',
         })
 
+    def test_update_blob_column_with_rolling_files(self):
+        from pypaimon import Schema
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob.target-file-size': '1KB',
+            }
+        )
+        self.catalog.create_table('test_db.blob_update_column_rolling', 
schema, False)
+        table = self.catalog.get_table('test_db.blob_update_column_rolling')
+
+        initial = pa.Table.from_pydict({
+            'id': [1, 2, 3, 4, 5],
+            'blob_data': [b'blob-1', b'blob-2', b'blob-3', b'blob-4', 
b'blob-5'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(initial)
+        write_builder.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
+        row_id_builder = table.new_read_builder().with_projection(['id', 
'_ROW_ID'])
+        row_id_result = row_id_builder.new_read().to_arrow(
+            row_id_builder.new_scan().plan().splits())
+        row_ids_by_id = {
+            row['id']: row['_ROW_ID']
+            for row in row_id_result.select(['id', '_ROW_ID']).to_pylist()
+        }
+
+        update_builder = table.new_batch_write_builder()
+        table_update = 
update_builder.new_update().with_update_type(['blob_data'])
+        payload = b'x' * 2048
+        update_ids = [1, 2, 3, 4, 5]
+        update_data = pa.Table.from_pydict({
+            '_ROW_ID': pa.array([row_ids_by_id[id_] for id_ in update_ids], 
type=pa.int64()),
+            'blob_data': pa.array(
+                [payload + str(id_ - 1).encode() for id_ in update_ids],
+                type=pa.large_binary(),
+            ),
+        })
+        update_messages = table_update.update_by_arrow_with_row_id(update_data)
+
+        update_blob_files = [
+            file
+            for msg in update_messages
+            for file in msg.new_files
+            if file.file_name.endswith('.blob')
+        ]
+        self.assertGreater(len(update_blob_files), 1)
+
+        update_builder.new_commit().commit(update_messages)
+
+        read_builder = table.new_read_builder()
+        result = 
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+        by_id = {
+            row['id']: row['blob_data']
+            for row in result.select(['id', 'blob_data']).to_pylist()
+        }
+        self.assertEqual(by_id, {
+            1: payload + b'0',
+            2: payload + b'1',
+            3: payload + b'2',
+            4: payload + b'3',
+            5: payload + b'4',
+        })
+
+    def test_update_partial_blob_column_with_rolling_files(self):
+        from pypaimon import Schema
+        from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+        from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob.target-file-size': '1KB',
+            }
+        )
+        
self.catalog.create_table('test_db.blob_update_partial_column_rolling', schema, 
False)
+        table = 
self.catalog.get_table('test_db.blob_update_partial_column_rolling')
+
+        initial = pa.Table.from_pydict({
+            'id': [1, 2, 3, 4, 5],
+            'blob_data': [b'blob-1', b'blob-2', b'blob-3', b'blob-4', 
b'blob-5'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(initial)
+        write_builder.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
+        row_id_builder = table.new_read_builder().with_projection(['id', 
'_ROW_ID'])
+        row_id_result = row_id_builder.new_read().to_arrow(
+            row_id_builder.new_scan().plan().splits())
+        row_ids_by_id = {
+            row['id']: row['_ROW_ID']
+            for row in row_id_result.select(['id', '_ROW_ID']).to_pylist()
+        }
+
+        update_builder = table.new_batch_write_builder()
+        table_update = 
update_builder.new_update().with_update_type(['blob_data'])
+        payload = b'x' * 2048
+        update_ids = [2, 4]
+        update_data = pa.Table.from_pydict({
+            '_ROW_ID': pa.array([row_ids_by_id[id_] for id_ in update_ids], 
type=pa.int64()),
+            'blob_data': pa.array(
+                [payload + str(id_).encode() for id_ in update_ids],
+                type=pa.large_binary(),
+            ),
+        })
+        update_messages = table_update.update_by_arrow_with_row_id(update_data)
+
+        update_blob_files = [
+            file
+            for msg in update_messages
+            for file in msg.new_files
+            if file.file_name.endswith('.blob')
+        ]
+        self.assertGreater(len(update_blob_files), 1)
+
+        update_blob_lengths = []
+        blob_fields = [field for field in table.fields if field.name == 
'blob_data']
+        for blob_file in update_blob_files:
+            blob_reader = FormatBlobReader(
+                file_io=table.file_io,
+                file_path=blob_file.file_path,
+                read_fields=['blob_data'],
+                full_fields=blob_fields,
+                push_down_predicate=None,
+                blob_as_descriptor=False,
+            )
+            update_blob_lengths.extend(blob_reader.blob_lengths)
+            blob_reader.close()
+        self.assertEqual(
+            update_blob_lengths.count(BlobFormatWriter.PLACE_HOLDER_LENGTH),
+            3,
+        )
+
+        update_builder.new_commit().commit(update_messages)
+
+        read_builder = table.new_read_builder()
+        result = 
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+        by_id = {
+            row['id']: row['blob_data']
+            for row in result.select(['id', 'blob_data']).to_pylist()
+        }
+        self.assertEqual(by_id, {
+            1: b'blob-1',
+            2: payload + b'2',
+            3: b'blob-3',
+            4: payload + b'4',
+            5: b'blob-5',
+        })
+
     def test_blob_write_read_partition(self):
         """Test complete end-to-end blob functionality: write blob data and 
read it back to verify correctness."""
         from pypaimon import Schema
diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py 
b/paimon-python/pypaimon/tests/write/conflict_detection_test.py
index 302a7d801f..72ac50f79b 100644
--- a/paimon-python/pypaimon/tests/write/conflict_detection_test.py
+++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py
@@ -127,6 +127,42 @@ class TestCheckRowIdExistence(unittest.TestCase):
         self.assertIsNotNone(result)
         self.assertIn("Row ID existence conflict", str(result))
 
+    def test_no_conflict_when_blob_file_range_is_covered(self):
+        detection = self._make_detection()
+        base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)]
+        delta = [_make_entry("p1.blob", kind=0, first_row_id=20, row_count=10)]
+        self.assertIsNone(
+            detection.check_row_id_existence(base, delta, next_row_id=200))
+
+    def test_conflict_when_blob_file_range_is_not_covered(self):
+        detection = self._make_detection()
+        base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)]
+        delta = [_make_entry("p1.blob", kind=0, first_row_id=95, row_count=10)]
+        result = detection.check_row_id_existence(base, delta, next_row_id=200)
+        self.assertIsNotNone(result)
+        self.assertIn("Row ID existence conflict", str(result))
+
+    def 
test_no_conflict_when_blob_file_range_is_covered_by_multiple_files(self):
+        detection = self._make_detection()
+        base = [
+            _make_entry("f1", kind=0, first_row_id=0, row_count=50),
+            _make_entry("f2", kind=0, first_row_id=50, row_count=50),
+        ]
+        delta = [_make_entry("p1.blob", kind=0, first_row_id=25, row_count=50)]
+        self.assertIsNone(
+            detection.check_row_id_existence(base, delta, next_row_id=200))
+
+    def 
test_conflict_when_blob_file_range_is_only_covered_by_base_blob_file(self):
+        detection = self._make_detection()
+        base = [
+            _make_entry("f1", kind=0, first_row_id=0, row_count=50),
+            _make_entry("p0.blob", kind=0, first_row_id=50, row_count=50),
+        ]
+        delta = [_make_entry("p1.blob", kind=0, first_row_id=60, row_count=10)]
+        result = detection.check_row_id_existence(base, delta, next_row_id=200)
+        self.assertIsNotNone(result)
+        self.assertIn("Row ID existence conflict", str(result))
+
     def test_skip_newly_appended_files(self):
         detection = self._make_detection()
         base = []
diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py 
b/paimon-python/pypaimon/write/commit/conflict_detection.py
index fc92dc838b..0d04f464ce 100644
--- a/paimon-python/pypaimon/write/commit/conflict_detection.py
+++ b/paimon-python/pypaimon/write/commit/conflict_detection.py
@@ -206,13 +206,27 @@ class ConflictDetection:
             return None
 
         existing_index = set()
+        existing_ranges = {}
         for base in base_entries:
             if base.file.first_row_id is not None:
                 existing_index.add((
                     base.partition, base.bucket,
                     base.file.first_row_id, base.file.row_count))
+                if not DataFileMeta.is_blob_file(base.file.file_name):
+                    existing_ranges.setdefault((base.partition, base.bucket), 
[]).append(
+                        base.file.row_id_range())
+
+        existing_ranges = {
+            key: Range.sort_and_merge_overlap(ranges, True, True)
+            for key, ranges in existing_ranges.items()
+        }
 
         for entry in files_to_check:
+            if DataFileMeta.is_blob_file(entry.file.file_name):
+                base_ranges = existing_ranges.get((entry.partition, 
entry.bucket), [])
+                if not entry.file.row_id_range().exclude(base_ranges):
+                    continue
+
             key = (entry.partition, entry.bucket,
                    entry.file.first_row_id, entry.file.row_count)
             if key not in existing_index:
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py 
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index 34816d6ffb..c45a705d00 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -375,9 +375,8 @@ class TableUpdateByRowId:
                 new_files.extend(blob_writer.prepare_commit())
 
             if new_files:
-                for file in new_files:
-                    file.first_row_id = first_row_id
-                    file.write_cols = file.write_cols or column_names
+                self._assign_update_file_metadata(
+                    new_files, first_row_id, column_names, 
original_data.num_rows)
                 self.commit_messages.append(
                     CommitMessage(
                         partition=partition_tuple,
@@ -391,3 +390,41 @@ class TableUpdateByRowId:
                 file_store_write.close()
             for blob_writer in blob_writers:
                 blob_writer.close()
+
+    @staticmethod
+    def _assign_update_file_metadata(new_files: List[DataFileMeta], 
first_row_id: int,
+                                     column_names: List[str], 
expected_row_count: int):
+        blob_end = first_row_id + expected_row_count
+        blob_starts = {}
+        # BlobWriter.prepare_commit preserves write/rolling order, which is 
required
+        # for assigning continuous row-id ranges to rolled blob files.
+        for file in new_files:
+            file.write_cols = file.write_cols or column_names
+            if DataFileMeta.is_blob_file(file.file_name):
+                if len(file.write_cols) != 1:
+                    raise RuntimeError(
+                        f"Blob update file {file.file_name} should contain "
+                        f"exactly one write column, got {file.write_cols}")
+                blob_column = file.write_cols[0]
+                blob_start = blob_starts.get(blob_column, first_row_id)
+                next_blob_start = blob_start + file.row_count
+                if next_blob_start > blob_end:
+                    raise RuntimeError(
+                        f"Blob update file {file.file_name} row-id range "
+                        f"[{blob_start}, {next_blob_start - 1}] exceeds target 
range "
+                        f"[{first_row_id}, {blob_end - 1}]")
+                file.first_row_id = blob_start
+                # Only update-by-row-id blob delta files use the 0/0 sentinel;
+                # regular blob writes keep their per-row sequence range.
+                file.min_sequence_number = 0
+                file.max_sequence_number = 0
+                blob_starts[blob_column] = next_blob_start
+            else:
+                file.first_row_id = first_row_id
+
+        for blob_column, next_blob_start in blob_starts.items():
+            if next_blob_start != blob_end:
+                raise RuntimeError(
+                    f"Blob update column {blob_column} covers row ids "
+                    f"[{first_row_id}, {next_blob_start - 1}], expected "
+                    f"[{first_row_id}, {blob_end - 1}]")
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py 
b/paimon-python/pypaimon/write/writer/blob_writer.py
index 24f64e4dff..96ed205ebf 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -213,7 +213,7 @@ class BlobWriter(AppendOnlyDataWriter):
         ))
 
     def prepare_commit(self):
-        """Prepare commit, ensuring all data is written."""
+        """Prepare commit, preserving blob files in write/rolling order."""
         # Close current file if open.
         if self.current_writer is not None:
             self.close_current_writer()

Reply via email to