This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e8dd5d3cfd [python] supports random access of blob files (#8033)
e8dd5d3cfd is described below

commit e8dd5d3cfdd29661da5152b44b972fbf3859ac9c
Author: Faiz <[email protected]>
AuthorDate: Fri May 29 21:49:12 2026 +0800

    [python] supports random access of blob files (#8033)
---
 .../pypaimon/read/reader/format_blob_reader.py     | 23 ++++++++-
 paimon-python/pypaimon/read/split_read.py          |  8 +--
 paimon-python/pypaimon/tests/blob_table_test.py    |  9 +++-
 paimon-python/pypaimon/tests/blob_test.py          | 58 ++++++++++++++++++++++
 4 files changed, 92 insertions(+), 6 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index 94b2f157c0..52f197097f 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -37,7 +37,7 @@ class FormatBlobReader(RecordBatchReader):
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
                  full_fields: List[DataField], push_down_predicate: Any, 
blob_as_descriptor: bool,
-                 batch_size: int = 1024):
+                 batch_size: int = 1024, row_indices: Optional[Any] = None):
         self._file_io = file_io
         self._file_path = file_path
         self._push_down_predicate = push_down_predicate
@@ -56,6 +56,7 @@ class FormatBlobReader(RecordBatchReader):
             self._file_size = file_io.get_file_size(file_path)
             self._input_stream = file_io.new_input_stream(file_path)
             self._read_index()
+            self._apply_row_indices(row_indices)
             if self._blob_as_descriptor:
                 self._input_stream.close()
                 self._input_stream = None
@@ -187,6 +188,26 @@ class FormatBlobReader(RecordBatchReader):
         self.blob_lengths = blob_lengths
         self.blob_offsets = blob_offsets
 
+    def _apply_row_indices(self, row_indices: Optional[Any]) -> None:
+        if row_indices is None:
+            return
+
+        selected_lengths = []
+        selected_offsets = []
+        record_count = len(self.blob_lengths)
+        for row_index in row_indices:
+            row_index = int(row_index)
+            if row_index < 0 or row_index >= record_count:
+                raise IndexError(
+                    f"Blob row index {row_index} is out of range for file "
+                    f"{self.file_path}, record count: {record_count}."
+                )
+            selected_lengths.append(self.blob_lengths[row_index])
+            selected_offsets.append(self.blob_offsets[row_index])
+
+        self.blob_lengths = selected_lengths
+        self.blob_offsets = selected_offsets
+
 
 class BlobRecordIterator:
     MAGIC_NUMBER_SIZE = 4
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 351f8de83e..2a9a8f3db7 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -190,13 +190,14 @@ class SplitRead(ABC):
 
         batch_size = self.table.options.read_batch_size()
 
-        # Convert global row_ranges (IndexedSplit) to local row_indices for 
Vortex/Lance native pushdown
+        # Convert global row_ranges (IndexedSplit) to local row_indices for 
native pushdown.
         row_indices = None
         if row_ranges is not None:
             effective_row_ranges = Range.and_(row_ranges, 
[file.row_id_range()])
             if len(effective_row_ranges) == 0:
                 return EmptyRecordBatchReader()
-            row_index_formats = (CoreOptions.FILE_FORMAT_VORTEX,
+            row_index_formats = (CoreOptions.FILE_FORMAT_BLOB,
+                                 CoreOptions.FILE_FORMAT_VORTEX,
                                  CoreOptions.FILE_FORMAT_LANCE,
                                  CoreOptions.FILE_FORMAT_ROW)
             if file_format in row_index_formats:
@@ -240,7 +241,8 @@ class SplitRead(ABC):
             blob_as_descriptor = 
CoreOptions.blob_as_descriptor(self.table.options)
             format_reader = FormatBlobReader(self.table.file_io, file_path, 
read_file_fields,
                                              self.read_fields, 
read_arrow_predicate, blob_as_descriptor,
-                                             batch_size=batch_size)
+                                             batch_size=batch_size,
+                                             row_indices=row_indices)
         elif file_format == CoreOptions.FILE_FORMAT_LANCE:
             if has_nested:
                 raise NotImplementedError(
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 042c47699a..5eaa57b056 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -3177,14 +3177,14 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         self.catalog.create_table('test_db.blob_rowid_equal', schema, False)
         table = self.catalog.get_table('test_db.blob_rowid_equal')
 
-        blob_bytes = os.urandom(248 * 1024)
+        blob_values = [bytes([i]) * (248 * 1024) for i in range(20)]
         write_builder = table.new_batch_write_builder()
         tw = write_builder.new_write()
         tc = write_builder.new_commit()
         batch = pa.Table.from_pydict({
             'id': list(range(20)),
             'name': [f'item_{i}' for i in range(20)],
-            'data': [blob_bytes] * 20,
+            'data': blob_values,
         }, schema=pa_schema)
         tw.write_arrow(batch)
         tc.commit(tw.prepare_commit())
@@ -3195,6 +3195,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         from pypaimon.table.special_fields import SpecialFields
 
         rb = table.new_read_builder()
+        rb.with_projection(['id', 'name', 'data', SpecialFields.ROW_ID.name])
         fields = list(table.fields)
         fields.append(SpecialFields.ROW_ID)
         pb = PredicateBuilder(fields)
@@ -3206,6 +3207,10 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         read = rb.new_read()
         result = read.to_arrow(splits)
         self.assertEqual(result.num_rows, 1)
+        self.assertEqual(result.column('id').to_pylist(), [5])
+        self.assertEqual(result.column('name').to_pylist(), ['item_5'])
+        self.assertEqual(result.column('data').to_pylist(), [blob_values[5]])
+        self.assertEqual(result.column(SpecialFields.ROW_ID.name).to_pylist(), 
[5])
 
     def test_rename_blob_column_should_fail(self):
         pa_schema = pa.schema([
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index 9d54949e2f..b91ffdaf43 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -726,6 +726,64 @@ class BlobEndToEndTest(unittest.TestCase):
         self.assertEqual(counting_file_io.input_stream_count, 1)
         reader.close()
 
+    def test_blob_reader_row_indices_pushdown(self):
+        file_io = LocalFileIO(self.temp_dir, Options({}))
+        blob_field_name = "blob_field"
+        blob_data = [f"value_{i}".encode("utf-8") for i in range(6)]
+        schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+        table = pa.table([blob_data], schema=schema)
+        blob_file_path = Path(self.temp_dir) / "row_indices.blob"
+        blob_file_url = _to_url(blob_file_path)
+        file_io.write_blob(blob_file_url, table)
+
+        read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=str(blob_file_path),
+            read_fields=[blob_field_name],
+            full_fields=read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=False,
+            batch_size=2,
+            row_indices=[1, 3, 4],
+        )
+        try:
+            batch = reader.read_arrow_batch()
+            self.assertIsNotNone(batch)
+            self.assertEqual(batch.column(0).to_pylist(), [blob_data[1], 
blob_data[3]])
+
+            batch = reader.read_arrow_batch()
+            self.assertIsNotNone(batch)
+            self.assertEqual(batch.column(0).to_pylist(), [blob_data[4]])
+
+            self.assertIsNone(reader.read_arrow_batch())
+        finally:
+            reader.close()
+
+    def test_blob_reader_row_indices_out_of_range(self):
+        file_io = LocalFileIO(self.temp_dir, Options({}))
+        blob_field_name = "blob_field"
+        blob_data = [b"value_0", b"value_1"]
+        schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+        table = pa.table([blob_data], schema=schema)
+        blob_file_path = Path(self.temp_dir) / "row_indices_out_of_range.blob"
+        blob_file_url = _to_url(blob_file_path)
+        file_io.write_blob(blob_file_url, table)
+
+        read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        with self.assertRaises(IndexError) as context:
+            FormatBlobReader(
+                file_io=file_io,
+                file_path=str(blob_file_path),
+                read_fields=[blob_field_name],
+                full_fields=read_fields,
+                push_down_predicate=None,
+                blob_as_descriptor=False,
+                row_indices=[0, 2],
+            )
+
+        self.assertIn("Blob row index 2 is out of range", 
str(context.exception))
+
     def test_blob_complex_types_throw_exception(self):
         """Test that complex types containing BLOB elements throw exceptions 
during read/write operations."""
         from pypaimon.schema.data_types import DataField, AtomicType, 
ArrayType, MultisetType, MapType

Reply via email to