This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 38022feae0 [python] Fix blob column updates with rolling files (#8156)
38022feae0 is described below
commit 38022feae0daa1dcd8dd2dd2aa449083dc6a13e0
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jun 8 18:58:46 2026 +0800
[python] Fix blob column updates with rolling files (#8156)
Before this PR, all rolled blob update files were assigned the same
`first_row_id`. As a result, valid BLOB column updates could fail during
commit with `Row ID existence conflict`, because the dedicated blob
files no longer described the correct row-id ranges.
This PR assigns proper contiguous row-id ranges to rolled blob files and
relaxes row-id existence checking for dedicated BLOB files to validate
range coverage by normal data files.
---
paimon-python/pypaimon/tests/blob_table_test.py | 170 +++++++++++++++++++++
.../tests/write/conflict_detection_test.py | 36 +++++
.../pypaimon/write/commit/conflict_detection.py | 14 ++
.../pypaimon/write/table_update_by_row_id.py | 43 +++++-
paimon-python/pypaimon/write/writer/blob_writer.py | 2 +-
5 files changed, 261 insertions(+), 4 deletions(-)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index e3af63ad52..9d87716a08 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1135,6 +1135,176 @@ class DedicatedFormatWriterTest(unittest.TestCase):
3: b'blob-3',
})
+ def test_update_blob_column_with_rolling_files(self):
+ from pypaimon import Schema
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '1KB',
+ }
+ )
+ self.catalog.create_table('test_db.blob_update_column_rolling',
schema, False)
+ table = self.catalog.get_table('test_db.blob_update_column_rolling')
+
+ initial = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'blob_data': [b'blob-1', b'blob-2', b'blob-3', b'blob-4',
b'blob-5'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(initial)
+ write_builder.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ row_id_builder = table.new_read_builder().with_projection(['id',
'_ROW_ID'])
+ row_id_result = row_id_builder.new_read().to_arrow(
+ row_id_builder.new_scan().plan().splits())
+ row_ids_by_id = {
+ row['id']: row['_ROW_ID']
+ for row in row_id_result.select(['id', '_ROW_ID']).to_pylist()
+ }
+
+ update_builder = table.new_batch_write_builder()
+ table_update =
update_builder.new_update().with_update_type(['blob_data'])
+ payload = b'x' * 2048
+ update_ids = [1, 2, 3, 4, 5]
+ update_data = pa.Table.from_pydict({
+ '_ROW_ID': pa.array([row_ids_by_id[id_] for id_ in update_ids],
type=pa.int64()),
+ 'blob_data': pa.array(
+ [payload + str(id_ - 1).encode() for id_ in update_ids],
+ type=pa.large_binary(),
+ ),
+ })
+ update_messages = table_update.update_by_arrow_with_row_id(update_data)
+
+ update_blob_files = [
+ file
+ for msg in update_messages
+ for file in msg.new_files
+ if file.file_name.endswith('.blob')
+ ]
+ self.assertGreater(len(update_blob_files), 1)
+
+ update_builder.new_commit().commit(update_messages)
+
+ read_builder = table.new_read_builder()
+ result =
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+ by_id = {
+ row['id']: row['blob_data']
+ for row in result.select(['id', 'blob_data']).to_pylist()
+ }
+ self.assertEqual(by_id, {
+ 1: payload + b'0',
+ 2: payload + b'1',
+ 3: payload + b'2',
+ 4: payload + b'3',
+ 5: payload + b'4',
+ })
+
+ def test_update_partial_blob_column_with_rolling_files(self):
+ from pypaimon import Schema
+ from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+ from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '1KB',
+ }
+ )
+
self.catalog.create_table('test_db.blob_update_partial_column_rolling', schema,
False)
+ table =
self.catalog.get_table('test_db.blob_update_partial_column_rolling')
+
+ initial = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'blob_data': [b'blob-1', b'blob-2', b'blob-3', b'blob-4',
b'blob-5'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(initial)
+ write_builder.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ row_id_builder = table.new_read_builder().with_projection(['id',
'_ROW_ID'])
+ row_id_result = row_id_builder.new_read().to_arrow(
+ row_id_builder.new_scan().plan().splits())
+ row_ids_by_id = {
+ row['id']: row['_ROW_ID']
+ for row in row_id_result.select(['id', '_ROW_ID']).to_pylist()
+ }
+
+ update_builder = table.new_batch_write_builder()
+ table_update =
update_builder.new_update().with_update_type(['blob_data'])
+ payload = b'x' * 2048
+ update_ids = [2, 4]
+ update_data = pa.Table.from_pydict({
+ '_ROW_ID': pa.array([row_ids_by_id[id_] for id_ in update_ids],
type=pa.int64()),
+ 'blob_data': pa.array(
+ [payload + str(id_).encode() for id_ in update_ids],
+ type=pa.large_binary(),
+ ),
+ })
+ update_messages = table_update.update_by_arrow_with_row_id(update_data)
+
+ update_blob_files = [
+ file
+ for msg in update_messages
+ for file in msg.new_files
+ if file.file_name.endswith('.blob')
+ ]
+ self.assertGreater(len(update_blob_files), 1)
+
+ update_blob_lengths = []
+ blob_fields = [field for field in table.fields if field.name ==
'blob_data']
+ for blob_file in update_blob_files:
+ blob_reader = FormatBlobReader(
+ file_io=table.file_io,
+ file_path=blob_file.file_path,
+ read_fields=['blob_data'],
+ full_fields=blob_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False,
+ )
+ update_blob_lengths.extend(blob_reader.blob_lengths)
+ blob_reader.close()
+ self.assertEqual(
+ update_blob_lengths.count(BlobFormatWriter.PLACE_HOLDER_LENGTH),
+ 3,
+ )
+
+ update_builder.new_commit().commit(update_messages)
+
+ read_builder = table.new_read_builder()
+ result =
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+ by_id = {
+ row['id']: row['blob_data']
+ for row in result.select(['id', 'blob_data']).to_pylist()
+ }
+ self.assertEqual(by_id, {
+ 1: b'blob-1',
+ 2: payload + b'2',
+ 3: b'blob-3',
+ 4: payload + b'4',
+ 5: b'blob-5',
+ })
+
def test_blob_write_read_partition(self):
"""Test complete end-to-end blob functionality: write blob data and
read it back to verify correctness."""
from pypaimon import Schema
diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py
b/paimon-python/pypaimon/tests/write/conflict_detection_test.py
index 302a7d801f..72ac50f79b 100644
--- a/paimon-python/pypaimon/tests/write/conflict_detection_test.py
+++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py
@@ -127,6 +127,42 @@ class TestCheckRowIdExistence(unittest.TestCase):
self.assertIsNotNone(result)
self.assertIn("Row ID existence conflict", str(result))
+ def test_no_conflict_when_blob_file_range_is_covered(self):
+ detection = self._make_detection()
+ base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)]
+ delta = [_make_entry("p1.blob", kind=0, first_row_id=20, row_count=10)]
+ self.assertIsNone(
+ detection.check_row_id_existence(base, delta, next_row_id=200))
+
+ def test_conflict_when_blob_file_range_is_not_covered(self):
+ detection = self._make_detection()
+ base = [_make_entry("f1", kind=0, first_row_id=0, row_count=100)]
+ delta = [_make_entry("p1.blob", kind=0, first_row_id=95, row_count=10)]
+ result = detection.check_row_id_existence(base, delta, next_row_id=200)
+ self.assertIsNotNone(result)
+ self.assertIn("Row ID existence conflict", str(result))
+
+ def
test_no_conflict_when_blob_file_range_is_covered_by_multiple_files(self):
+ detection = self._make_detection()
+ base = [
+ _make_entry("f1", kind=0, first_row_id=0, row_count=50),
+ _make_entry("f2", kind=0, first_row_id=50, row_count=50),
+ ]
+ delta = [_make_entry("p1.blob", kind=0, first_row_id=25, row_count=50)]
+ self.assertIsNone(
+ detection.check_row_id_existence(base, delta, next_row_id=200))
+
+ def
test_conflict_when_blob_file_range_is_only_covered_by_base_blob_file(self):
+ detection = self._make_detection()
+ base = [
+ _make_entry("f1", kind=0, first_row_id=0, row_count=50),
+ _make_entry("p0.blob", kind=0, first_row_id=50, row_count=50),
+ ]
+ delta = [_make_entry("p1.blob", kind=0, first_row_id=60, row_count=10)]
+ result = detection.check_row_id_existence(base, delta, next_row_id=200)
+ self.assertIsNotNone(result)
+ self.assertIn("Row ID existence conflict", str(result))
+
def test_skip_newly_appended_files(self):
detection = self._make_detection()
base = []
diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py
b/paimon-python/pypaimon/write/commit/conflict_detection.py
index fc92dc838b..0d04f464ce 100644
--- a/paimon-python/pypaimon/write/commit/conflict_detection.py
+++ b/paimon-python/pypaimon/write/commit/conflict_detection.py
@@ -206,13 +206,27 @@ class ConflictDetection:
return None
existing_index = set()
+ existing_ranges = {}
for base in base_entries:
if base.file.first_row_id is not None:
existing_index.add((
base.partition, base.bucket,
base.file.first_row_id, base.file.row_count))
+ if not DataFileMeta.is_blob_file(base.file.file_name):
+ existing_ranges.setdefault((base.partition, base.bucket),
[]).append(
+ base.file.row_id_range())
+
+ existing_ranges = {
+ key: Range.sort_and_merge_overlap(ranges, True, True)
+ for key, ranges in existing_ranges.items()
+ }
for entry in files_to_check:
+ if DataFileMeta.is_blob_file(entry.file.file_name):
+ base_ranges = existing_ranges.get((entry.partition,
entry.bucket), [])
+ if not entry.file.row_id_range().exclude(base_ranges):
+ continue
+
key = (entry.partition, entry.bucket,
entry.file.first_row_id, entry.file.row_count)
if key not in existing_index:
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index 34816d6ffb..c45a705d00 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -375,9 +375,8 @@ class TableUpdateByRowId:
new_files.extend(blob_writer.prepare_commit())
if new_files:
- for file in new_files:
- file.first_row_id = first_row_id
- file.write_cols = file.write_cols or column_names
+ self._assign_update_file_metadata(
+ new_files, first_row_id, column_names,
original_data.num_rows)
self.commit_messages.append(
CommitMessage(
partition=partition_tuple,
@@ -391,3 +390,41 @@ class TableUpdateByRowId:
file_store_write.close()
for blob_writer in blob_writers:
blob_writer.close()
+
+ @staticmethod
+ def _assign_update_file_metadata(new_files: List[DataFileMeta],
first_row_id: int,
+ column_names: List[str],
expected_row_count: int):
+ blob_end = first_row_id + expected_row_count
+ blob_starts = {}
+ # BlobWriter.prepare_commit preserves write/rolling order, which is
required
+ # for assigning continuous row-id ranges to rolled blob files.
+ for file in new_files:
+ file.write_cols = file.write_cols or column_names
+ if DataFileMeta.is_blob_file(file.file_name):
+ if len(file.write_cols) != 1:
+ raise RuntimeError(
+ f"Blob update file {file.file_name} should contain "
+ f"exactly one write column, got {file.write_cols}")
+ blob_column = file.write_cols[0]
+ blob_start = blob_starts.get(blob_column, first_row_id)
+ next_blob_start = blob_start + file.row_count
+ if next_blob_start > blob_end:
+ raise RuntimeError(
+ f"Blob update file {file.file_name} row-id range "
+ f"[{blob_start}, {next_blob_start - 1}] exceeds target
range "
+ f"[{first_row_id}, {blob_end - 1}]")
+ file.first_row_id = blob_start
+ # Only update-by-row-id blob delta files use the 0/0 sentinel;
+ # regular blob writes keep their per-row sequence range.
+ file.min_sequence_number = 0
+ file.max_sequence_number = 0
+ blob_starts[blob_column] = next_blob_start
+ else:
+ file.first_row_id = first_row_id
+
+ for blob_column, next_blob_start in blob_starts.items():
+ if next_blob_start != blob_end:
+ raise RuntimeError(
+ f"Blob update column {blob_column} covers row ids "
+ f"[{first_row_id}, {next_blob_start - 1}], expected "
+ f"[{first_row_id}, {blob_end - 1}]")
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
index 24f64e4dff..96ed205ebf 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -213,7 +213,7 @@ class BlobWriter(AppendOnlyDataWriter):
))
def prepare_commit(self):
- """Prepare commit, ensuring all data is written."""
+ """Prepare commit, preserving blob files in write/rolling order."""
# Close current file if open.
if self.current_writer is not None:
self.close_current_writer()