This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 12e1b6039c [python] Fix blob table read failure when filtering by 
_ROW_ID (#7390)
12e1b6039c is described below

commit 12e1b6039cce4d09ba1201ea9241739a5e79b6fa
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Mar 10 12:08:40 2026 +0800

    [python] Fix blob table read failure when filtering by _ROW_ID (#7390)
    
    When a blob table has multiple blob files (due to
    `blob.target-file-size` rolling), filtering by `_ROW_ID` fails:
    
    ```ValueError: All files in a field merge split should have the same row 
count.```
---
 paimon-python/pypaimon/read/reader/field_bunch.py | 47 +++++++++++++---------
 paimon-python/pypaimon/read/split_read.py         | 22 ++++++-----
 paimon-python/pypaimon/tests/blob_table_test.py   | 48 +++++++++++++++++++++++
 3 files changed, 90 insertions(+), 27 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py 
b/paimon-python/pypaimon/read/reader/field_bunch.py
index 0e20654b69..fccf2cb325 100644
--- a/paimon-python/pypaimon/read/reader/field_bunch.py
+++ b/paimon-python/pypaimon/read/reader/field_bunch.py
@@ -54,9 +54,10 @@ class DataBunch(FieldBunch):
 class BlobBunch(FieldBunch):
     """Files for partial field (blob files)."""
 
-    def __init__(self, expected_row_count: int):
+    def __init__(self, expected_row_count: int, row_id_push_down: bool = 
False):
         self._files: List[DataFileMeta] = []
         self.expected_row_count = expected_row_count
+        self.row_id_push_down = row_id_push_down
         self.latest_first_row_id = -1
         self.expected_next_first_row_id = -1
         self.latest_max_sequence_number = -1
@@ -76,26 +77,36 @@ class BlobBunch(FieldBunch):
 
         if self._files:
             first_row_id = file.first_row_id
-            if first_row_id < self.expected_next_first_row_id:
-                if file.max_sequence_number >= self.latest_max_sequence_number:
+            if self.row_id_push_down:
+                if first_row_id < self.expected_next_first_row_id:
+                    if file.max_sequence_number > 
self.latest_max_sequence_number:
+                        last_file = self._files.pop()
+                        self._row_count -= last_file.row_count
+                    else:
+                        return
+            else:
+                if first_row_id < self.expected_next_first_row_id:
+                    if file.max_sequence_number >= 
self.latest_max_sequence_number:
+                        raise ValueError(
+                            "Blob file with overlapping row id should have "
+                            "decreasing sequence number."
+                        )
+                    return
+                elif first_row_id > self.expected_next_first_row_id:
                     raise ValueError(
-                        "Blob file with overlapping row id should have 
decreasing sequence number."
+                        f"Blob file first row id should be continuous, expect "
+                        f"{self.expected_next_first_row_id} but got 
{first_row_id}"
                     )
-                return
-            elif first_row_id > self.expected_next_first_row_id:
-                raise ValueError(
-                    f"Blob file first row id should be continuous, expect "
-                    f"{self.expected_next_first_row_id} but got {first_row_id}"
-                )
 
-            if file.schema_id != self._files[0].schema_id:
-                raise ValueError(
-                    "All files in a blob bunch should have the same schema id."
-                )
-            if file.write_cols != self._files[0].write_cols:
-                raise ValueError(
-                    "All files in a blob bunch should have the same write 
columns."
-                )
+            if self._files:
+                if file.schema_id != self._files[0].schema_id:
+                    raise ValueError(
+                        "All files in a blob bunch should have the same schema 
id."
+                    )
+                if file.write_cols != self._files[0].write_cols:
+                    raise ValueError(
+                        "All files in a blob bunch should have the same write 
columns."
+                    )
 
         self._files.append(file)
         self._row_count += file.row_count
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 6034723647..3d2e8975c5 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -550,17 +550,20 @@ class DataEvolutionSplitRead(SplitRead):
         # Split field bunches
         fields_files = self._split_field_bunches(need_merge_files)
 
-        # Validate row counts and first row IDs
+        # Validate row counts and first row IDs (skip when row ranges are 
pushed down)
         row_count = fields_files[0].row_count()
         first_row_id = fields_files[0].files()[0].first_row_id
 
-        for bunch in fields_files:
-            if bunch.row_count() != row_count:
-                raise ValueError("All files in a field merge split should have 
the same row count.")
-            if bunch.files()[0].first_row_id != first_row_id:
-                raise ValueError(
-                    "All files in a field merge split should have the same 
first row id and could not be null."
-                )
+        if self.row_ranges is None:
+            for bunch in fields_files:
+                if bunch.row_count() != row_count:
+                    raise ValueError(
+                        "All files in a field merge split should have the same 
row count.")
+                if bunch.files()[0].first_row_id != first_row_id:
+                    raise ValueError(
+                        "All files in a field merge split should have the same 
"
+                        "first row id and could not be null."
+                    )
 
         # Create the union reader
         all_read_fields = self.read_fields
@@ -646,12 +649,13 @@ class DataEvolutionSplitRead(SplitRead):
         fields_files = []
         blob_bunch_map = {}
         row_count = -1
+        row_id_push_down = self.row_ranges is not None
 
         for file in need_merge_files:
             if DataFileMeta.is_blob_file(file.file_name):
                 field_id = self._get_field_id_from_write_cols(file)
                 if field_id not in blob_bunch_map:
-                    blob_bunch_map[field_id] = BlobBunch(row_count)
+                    blob_bunch_map[field_id] = BlobBunch(row_count, 
row_id_push_down)
                 blob_bunch_map[field_id].add(file)
             else:
                 # Normal file, just add it to the current merge split
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index f49f365f89..68cfbaf768 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -2913,6 +2913,54 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertEqual(list(df['text']), [f'text_{i}' for i in 
range(num_rows)])
         self.assertEqual(list(df['video_path']), [f'video_{i}.mp4' for i in 
range(num_rows)])
 
+    def test_blob_with_row_id_equal(self):
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob.target-file-size': '500 KB',
+            }
+        )
+        self.catalog.create_table('test_db.blob_rowid_equal', schema, False)
+        table = self.catalog.get_table('test_db.blob_rowid_equal')
+
+        blob_bytes = os.urandom(248 * 1024)
+        write_builder = table.new_batch_write_builder()
+        tw = write_builder.new_write()
+        tc = write_builder.new_commit()
+        batch = pa.Table.from_pydict({
+            'id': list(range(20)),
+            'name': [f'item_{i}' for i in range(20)],
+            'data': [blob_bytes] * 20,
+        }, schema=pa_schema)
+        tw.write_arrow(batch)
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        from pypaimon.common.predicate_builder import PredicateBuilder
+        from pypaimon.table.special_fields import SpecialFields
+
+        rb = table.new_read_builder()
+        fields = list(table.fields)
+        fields.append(SpecialFields.ROW_ID)
+        pb = PredicateBuilder(fields)
+        pred = pb.equal(SpecialFields.ROW_ID.name, 5)
+        rb.with_filter(pred)
+
+        scan = rb.new_scan()
+        splits = scan.plan().splits()
+        read = rb.new_read()
+        result = read.to_arrow(splits)
+        self.assertEqual(result.num_rows, 1)
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to