This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 12e1b6039c [python] Fix blob table read failure when filtering by
_ROW_ID (#7390)
12e1b6039c is described below
commit 12e1b6039cce4d09ba1201ea9241739a5e79b6fa
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Mar 10 12:08:40 2026 +0800
[python] Fix blob table read failure when filtering by _ROW_ID (#7390)
When a blob table has multiple blob files (due to
`blob.target-file-size` rolling), filtering by `_ROW_ID` fails:
```ValueError: All files in a field merge split should have the same row
count.```
---
paimon-python/pypaimon/read/reader/field_bunch.py | 47 +++++++++++++---------
paimon-python/pypaimon/read/split_read.py | 22 ++++++-----
paimon-python/pypaimon/tests/blob_table_test.py | 48 +++++++++++++++++++++++
3 files changed, 90 insertions(+), 27 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py
b/paimon-python/pypaimon/read/reader/field_bunch.py
index 0e20654b69..fccf2cb325 100644
--- a/paimon-python/pypaimon/read/reader/field_bunch.py
+++ b/paimon-python/pypaimon/read/reader/field_bunch.py
@@ -54,9 +54,10 @@ class DataBunch(FieldBunch):
class BlobBunch(FieldBunch):
"""Files for partial field (blob files)."""
- def __init__(self, expected_row_count: int):
+ def __init__(self, expected_row_count: int, row_id_push_down: bool =
False):
self._files: List[DataFileMeta] = []
self.expected_row_count = expected_row_count
+ self.row_id_push_down = row_id_push_down
self.latest_first_row_id = -1
self.expected_next_first_row_id = -1
self.latest_max_sequence_number = -1
@@ -76,26 +77,36 @@ class BlobBunch(FieldBunch):
if self._files:
first_row_id = file.first_row_id
- if first_row_id < self.expected_next_first_row_id:
- if file.max_sequence_number >= self.latest_max_sequence_number:
+ if self.row_id_push_down:
+ if first_row_id < self.expected_next_first_row_id:
+ if file.max_sequence_number >
self.latest_max_sequence_number:
+ last_file = self._files.pop()
+ self._row_count -= last_file.row_count
+ else:
+ return
+ else:
+ if first_row_id < self.expected_next_first_row_id:
+ if file.max_sequence_number >=
self.latest_max_sequence_number:
+ raise ValueError(
+ "Blob file with overlapping row id should have "
+ "decreasing sequence number."
+ )
+ return
+ elif first_row_id > self.expected_next_first_row_id:
raise ValueError(
- "Blob file with overlapping row id should have
decreasing sequence number."
+ f"Blob file first row id should be continuous, expect "
+ f"{self.expected_next_first_row_id} but got
{first_row_id}"
)
- return
- elif first_row_id > self.expected_next_first_row_id:
- raise ValueError(
- f"Blob file first row id should be continuous, expect "
- f"{self.expected_next_first_row_id} but got {first_row_id}"
- )
- if file.schema_id != self._files[0].schema_id:
- raise ValueError(
- "All files in a blob bunch should have the same schema id."
- )
- if file.write_cols != self._files[0].write_cols:
- raise ValueError(
- "All files in a blob bunch should have the same write
columns."
- )
+ if self._files:
+ if file.schema_id != self._files[0].schema_id:
+ raise ValueError(
+ "All files in a blob bunch should have the same schema
id."
+ )
+ if file.write_cols != self._files[0].write_cols:
+ raise ValueError(
+ "All files in a blob bunch should have the same write
columns."
+ )
self._files.append(file)
self._row_count += file.row_count
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 6034723647..3d2e8975c5 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -550,17 +550,20 @@ class DataEvolutionSplitRead(SplitRead):
# Split field bunches
fields_files = self._split_field_bunches(need_merge_files)
- # Validate row counts and first row IDs
+ # Validate row counts and first row IDs (skip when row ranges are
pushed down)
row_count = fields_files[0].row_count()
first_row_id = fields_files[0].files()[0].first_row_id
- for bunch in fields_files:
- if bunch.row_count() != row_count:
- raise ValueError("All files in a field merge split should have
the same row count.")
- if bunch.files()[0].first_row_id != first_row_id:
- raise ValueError(
- "All files in a field merge split should have the same
first row id and could not be null."
- )
+ if self.row_ranges is None:
+ for bunch in fields_files:
+ if bunch.row_count() != row_count:
+ raise ValueError(
+ "All files in a field merge split should have the same
row count.")
+ if bunch.files()[0].first_row_id != first_row_id:
+ raise ValueError(
+ "All files in a field merge split should have the same
"
+ "first row id and could not be null."
+ )
# Create the union reader
all_read_fields = self.read_fields
@@ -646,12 +649,13 @@ class DataEvolutionSplitRead(SplitRead):
fields_files = []
blob_bunch_map = {}
row_count = -1
+ row_id_push_down = self.row_ranges is not None
for file in need_merge_files:
if DataFileMeta.is_blob_file(file.file_name):
field_id = self._get_field_id_from_write_cols(file)
if field_id not in blob_bunch_map:
- blob_bunch_map[field_id] = BlobBunch(row_count)
+ blob_bunch_map[field_id] = BlobBunch(row_count,
row_id_push_down)
blob_bunch_map[field_id].add(file)
else:
# Normal file, just add it to the current merge split
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index f49f365f89..68cfbaf768 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -2913,6 +2913,54 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(list(df['text']), [f'text_{i}' for i in
range(num_rows)])
self.assertEqual(list(df['video_path']), [f'video_{i}.mp4' for i in
range(num_rows)])
+ def test_blob_with_row_id_equal(self):
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '500 KB',
+ }
+ )
+ self.catalog.create_table('test_db.blob_rowid_equal', schema, False)
+ table = self.catalog.get_table('test_db.blob_rowid_equal')
+
+ blob_bytes = os.urandom(248 * 1024)
+ write_builder = table.new_batch_write_builder()
+ tw = write_builder.new_write()
+ tc = write_builder.new_commit()
+ batch = pa.Table.from_pydict({
+ 'id': list(range(20)),
+ 'name': [f'item_{i}' for i in range(20)],
+ 'data': [blob_bytes] * 20,
+ }, schema=pa_schema)
+ tw.write_arrow(batch)
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ from pypaimon.common.predicate_builder import PredicateBuilder
+ from pypaimon.table.special_fields import SpecialFields
+
+ rb = table.new_read_builder()
+ fields = list(table.fields)
+ fields.append(SpecialFields.ROW_ID)
+ pb = PredicateBuilder(fields)
+ pred = pb.equal(SpecialFields.ROW_ID.name, 5)
+ rb.with_filter(pred)
+
+ scan = rb.new_scan()
+ splits = scan.plan().splits()
+ read = rb.new_read()
+ result = read.to_arrow(splits)
+ self.assertEqual(result.num_rows, 1)
+
if __name__ == '__main__':
unittest.main()