This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 95934286ca [python] optimize blob format reader by inline blob data 
(#8030)
95934286ca is described below

commit 95934286ca0be7e48f4eb07295d5fe64e9f0b6a6
Author: Faiz <[email protected]>
AuthorDate: Fri May 29 21:25:03 2026 +0800

    [python] optimize blob format reader by inline blob data (#8030)
    
    I'm testing random access of python's blob. Reading 100 0.5MB blobs cost
    30s even blob-as-descriptor = true. This is because each blob will open
    the input stream once.
---
 .../pypaimon/read/reader/format_blob_reader.py     | 133 +++++++++++++--------
 paimon-python/pypaimon/tests/blob_test.py          |  42 +++++++
 2 files changed, 124 insertions(+), 51 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index 355fb36dc4..94b2f157c0 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -16,7 +16,7 @@
 # under the License.
 
 import struct
-from typing import List, Optional, Any, Iterator
+from typing import List, Optional, Any, Iterator, BinaryIO
 
 import pyarrow as pa
 import pyarrow.dataset as ds
@@ -43,27 +43,33 @@ class FormatBlobReader(RecordBatchReader):
         self._push_down_predicate = push_down_predicate
         self._blob_as_descriptor = blob_as_descriptor
         self._batch_size = batch_size
-        # Get file size
-        self._file_size = file_io.get_file_size(file_path)
 
         # Initialize the low-level blob format reader
         self.file_path = file_path
         self.blob_lengths: List[int] = []
         self.blob_offsets: List[int] = []
         self.returned = False
-        self._read_index()
-
-        # Set up fields and schema
-        if len(read_fields) > 1:
-            raise RuntimeError("Blob reader only supports one field.")
-        self._fields = read_fields
-        full_fields_map = {field.name: field for field in full_fields}
-        projected_data_fields = [full_fields_map[name] for name in read_fields]
-        self._schema = 
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
-
-        # Initialize iterator
+        self._input_stream = None
         self._blob_iterator = None
         self._current_batch = None
+        try:
+            self._file_size = file_io.get_file_size(file_path)
+            self._input_stream = file_io.new_input_stream(file_path)
+            self._read_index()
+            if self._blob_as_descriptor:
+                self._input_stream.close()
+                self._input_stream = None
+
+            # Set up fields and schema
+            if len(read_fields) > 1:
+                raise RuntimeError("Blob reader only supports one field.")
+            self._fields = read_fields
+            full_fields_map = {field.name: field for field in full_fields}
+            projected_data_fields = [full_fields_map[name] for name in 
read_fields]
+            self._schema = 
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
+        except Exception:
+            self.close()
+            raise
 
     def read_arrow_batch(self, start_idx=None, end_idx=None) -> 
Optional[RecordBatch]:
         """
@@ -76,7 +82,7 @@ class FormatBlobReader(RecordBatchReader):
             self.returned = True
             batch_iterator = BlobRecordIterator(
                 self._file_io, self.file_path, self.blob_lengths,
-                self.blob_offsets, self._fields[0]
+                self.blob_offsets, self._fields[0], self._input_stream
             )
             self._blob_iterator = iter(batch_iterator)
         read_size = self._batch_size
@@ -140,42 +146,46 @@ class FormatBlobReader(RecordBatchReader):
 
     def close(self):
         self._blob_iterator = None
+        if self._input_stream is not None:
+            self._input_stream.close()
+            self._input_stream = None
 
     def _read_index(self) -> None:
-        with self._file_io.new_input_stream(self.file_path) as f:
-            # Seek to header: last 5 bytes
-            f.seek(self._file_size - 5)
-            header = f.read(5)
-
-            if len(header) != 5:
-                raise IOError("Invalid blob file: cannot read header")
-
-            # Parse header
-            index_length = struct.unpack('<I', header[:4])[0]  # Little endian
-            version = header[4]
-
-            if version != 1:
-                raise IOError(f"Unsupported blob file version: {version}")
-
-            # Read index data
-            f.seek(self._file_size - 5 - index_length)
-            index_bytes = f.read(index_length)
-
-            if len(index_bytes) != index_length:
-                raise IOError("Invalid blob file: cannot read index")
-
-            # Decompress blob lengths and compute offsets
-            blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
-            blob_offsets = []
-            offset = 0
-            for length in blob_lengths:
-                if length < 0:
-                    blob_offsets.append(-1)
-                else:
-                    blob_offsets.append(offset)
-                    offset += length
-            self.blob_lengths = blob_lengths
-            self.blob_offsets = blob_offsets
+        f = self._input_stream
+
+        # Seek to header: last 5 bytes
+        f.seek(self._file_size - 5)
+        header = f.read(5)
+
+        if len(header) != 5:
+            raise IOError("Invalid blob file: cannot read header")
+
+        # Parse header
+        index_length = struct.unpack('<I', header[:4])[0]  # Little endian
+        version = header[4]
+
+        if version != 1:
+            raise IOError(f"Unsupported blob file version: {version}")
+
+        # Read index data
+        f.seek(self._file_size - 5 - index_length)
+        index_bytes = f.read(index_length)
+
+        if len(index_bytes) != index_length:
+            raise IOError("Invalid blob file: cannot read index")
+
+        # Decompress blob lengths and compute offsets
+        blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
+        blob_offsets = []
+        offset = 0
+        for length in blob_lengths:
+            if length < 0:
+                blob_offsets.append(-1)
+            else:
+                blob_offsets.append(offset)
+                offset += length
+        self.blob_lengths = blob_lengths
+        self.blob_offsets = blob_offsets
 
 
 class BlobRecordIterator:
@@ -185,9 +195,11 @@ class BlobRecordIterator:
     PLACE_HOLDER_LENGTH = -2
 
     def __init__(self, file_io: FileIO, file_path: str, blob_lengths: 
List[int],
-                 blob_offsets: List[int], field_name: str):
+                 blob_offsets: List[int], field_name: str,
+                 input_stream: Optional[BinaryIO] = None):
         self.file_io = file_io
         self.file_path = file_path
+        self.input_stream = input_stream
         self.field_name = field_name
         self.blob_lengths = blob_lengths
         self.blob_offsets = blob_offsets
@@ -211,9 +223,28 @@ class BlobRecordIterator:
         # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 
bytes) = 12 bytes
         blob_offset = self.blob_offsets[self.current_position] + 
self.MAGIC_NUMBER_SIZE  # Skip magic number
         blob_length = length - self.METADATA_OVERHEAD
-        blob = Blob.from_file(self.file_io, self.file_path, blob_offset, 
blob_length)
+        if self.input_stream is not None:
+            blob = Blob.from_data(self._read_inline_blob(blob_offset, 
blob_length))
+        else:
+            blob = Blob.from_file(self.file_io, self.file_path, blob_offset, 
blob_length)
         self.current_position += 1
         return GenericRow([blob], fields, RowKind.INSERT)
 
     def returned_position(self) -> int:
         return self.current_position
+
+    def _read_inline_blob(self, position: int, length: int) -> bytes:
+        self.input_stream.seek(position)
+        data = self._read_fully(length)
+        if len(data) != length:
+            raise IOError("Invalid blob file: cannot read blob data")
+        return data
+
+    def _read_fully(self, length: int) -> bytes:
+        data = bytearray()
+        while len(data) < length:
+            chunk = self.input_stream.read(length - len(data))
+            if not chunk:
+                break
+            data.extend(chunk)
+        return bytes(data)
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index e6b856432b..9d54949e2f 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -684,6 +684,48 @@ class BlobEndToEndTest(unittest.TestCase):
 
             reader.close()
 
+    def test_blob_read_inline_bytes_reuses_reader_stream(self):
+        class CountingFileIO:
+
+            def __init__(self, delegate):
+                self._delegate = delegate
+                self.input_stream_count = 0
+
+            def __getattr__(self, name):
+                return getattr(self._delegate, name)
+
+            def new_input_stream(self, path):
+                self.input_stream_count += 1
+                return self._delegate.new_input_stream(path)
+
+        file_io = LocalFileIO(self.temp_dir, Options({}))
+        blob_field_name = "blob_field"
+        blob_data = [b"hello", b"world"]
+        schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+        table = pa.table([blob_data], schema=schema)
+        blob_file_path = Path(self.temp_dir) / (blob_field_name + 
"_inline.blob")
+        blob_file_url = _to_url(blob_file_path)
+        file_io.write_blob(blob_file_url, table)
+
+        counting_file_io = CountingFileIO(file_io)
+        read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        reader = FormatBlobReader(
+            file_io=counting_file_io,
+            file_path=str(blob_file_path),
+            read_fields=[blob_field_name],
+            full_fields=read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=False
+        )
+
+        batch = reader.read_arrow_batch()
+        self.assertIsNotNone(batch)
+        self.assertEqual(batch.num_rows, 2)
+        self.assertEqual(batch.column(0)[0].as_py(), b"hello")
+        self.assertEqual(batch.column(0)[1].as_py(), b"world")
+        self.assertEqual(counting_file_io.input_stream_count, 1)
+        reader.close()
+
     def test_blob_complex_types_throw_exception(self):
         """Test that complex types containing BLOB elements throw exceptions 
during read/write operations."""
         from pypaimon.schema.data_types import DataField, AtomicType, 
ArrayType, MultisetType, MapType

Reply via email to