This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 95934286ca [python] optimize blob format reader by inline blob data
(#8030)
95934286ca is described below
commit 95934286ca0be7e48f4eb07295d5fe64e9f0b6a6
Author: Faiz <[email protected]>
AuthorDate: Fri May 29 21:25:03 2026 +0800
[python] optimize blob format reader by inline blob data (#8030)
I'm testing random access of python's blob. Reading 100 0.5MB blobs cost
30s even blob-as-descriptor = true. This is because each blob will open
the input stream once.
---
.../pypaimon/read/reader/format_blob_reader.py | 133 +++++++++++++--------
paimon-python/pypaimon/tests/blob_test.py | 42 +++++++
2 files changed, 124 insertions(+), 51 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index 355fb36dc4..94b2f157c0 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -16,7 +16,7 @@
# under the License.
import struct
-from typing import List, Optional, Any, Iterator
+from typing import List, Optional, Any, Iterator, BinaryIO
import pyarrow as pa
import pyarrow.dataset as ds
@@ -43,27 +43,33 @@ class FormatBlobReader(RecordBatchReader):
self._push_down_predicate = push_down_predicate
self._blob_as_descriptor = blob_as_descriptor
self._batch_size = batch_size
- # Get file size
- self._file_size = file_io.get_file_size(file_path)
# Initialize the low-level blob format reader
self.file_path = file_path
self.blob_lengths: List[int] = []
self.blob_offsets: List[int] = []
self.returned = False
- self._read_index()
-
- # Set up fields and schema
- if len(read_fields) > 1:
- raise RuntimeError("Blob reader only supports one field.")
- self._fields = read_fields
- full_fields_map = {field.name: field for field in full_fields}
- projected_data_fields = [full_fields_map[name] for name in read_fields]
- self._schema =
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
-
- # Initialize iterator
+ self._input_stream = None
self._blob_iterator = None
self._current_batch = None
+ try:
+ self._file_size = file_io.get_file_size(file_path)
+ self._input_stream = file_io.new_input_stream(file_path)
+ self._read_index()
+ if self._blob_as_descriptor:
+ self._input_stream.close()
+ self._input_stream = None
+
+ # Set up fields and schema
+ if len(read_fields) > 1:
+ raise RuntimeError("Blob reader only supports one field.")
+ self._fields = read_fields
+ full_fields_map = {field.name: field for field in full_fields}
+ projected_data_fields = [full_fields_map[name] for name in
read_fields]
+ self._schema =
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
+ except Exception:
+ self.close()
+ raise
def read_arrow_batch(self, start_idx=None, end_idx=None) ->
Optional[RecordBatch]:
"""
@@ -76,7 +82,7 @@ class FormatBlobReader(RecordBatchReader):
self.returned = True
batch_iterator = BlobRecordIterator(
self._file_io, self.file_path, self.blob_lengths,
- self.blob_offsets, self._fields[0]
+ self.blob_offsets, self._fields[0], self._input_stream
)
self._blob_iterator = iter(batch_iterator)
read_size = self._batch_size
@@ -140,42 +146,46 @@ class FormatBlobReader(RecordBatchReader):
def close(self):
self._blob_iterator = None
+ if self._input_stream is not None:
+ self._input_stream.close()
+ self._input_stream = None
def _read_index(self) -> None:
- with self._file_io.new_input_stream(self.file_path) as f:
- # Seek to header: last 5 bytes
- f.seek(self._file_size - 5)
- header = f.read(5)
-
- if len(header) != 5:
- raise IOError("Invalid blob file: cannot read header")
-
- # Parse header
- index_length = struct.unpack('<I', header[:4])[0] # Little endian
- version = header[4]
-
- if version != 1:
- raise IOError(f"Unsupported blob file version: {version}")
-
- # Read index data
- f.seek(self._file_size - 5 - index_length)
- index_bytes = f.read(index_length)
-
- if len(index_bytes) != index_length:
- raise IOError("Invalid blob file: cannot read index")
-
- # Decompress blob lengths and compute offsets
- blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
- blob_offsets = []
- offset = 0
- for length in blob_lengths:
- if length < 0:
- blob_offsets.append(-1)
- else:
- blob_offsets.append(offset)
- offset += length
- self.blob_lengths = blob_lengths
- self.blob_offsets = blob_offsets
+ f = self._input_stream
+
+ # Seek to header: last 5 bytes
+ f.seek(self._file_size - 5)
+ header = f.read(5)
+
+ if len(header) != 5:
+ raise IOError("Invalid blob file: cannot read header")
+
+ # Parse header
+ index_length = struct.unpack('<I', header[:4])[0] # Little endian
+ version = header[4]
+
+ if version != 1:
+ raise IOError(f"Unsupported blob file version: {version}")
+
+ # Read index data
+ f.seek(self._file_size - 5 - index_length)
+ index_bytes = f.read(index_length)
+
+ if len(index_bytes) != index_length:
+ raise IOError("Invalid blob file: cannot read index")
+
+ # Decompress blob lengths and compute offsets
+ blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
+ blob_offsets = []
+ offset = 0
+ for length in blob_lengths:
+ if length < 0:
+ blob_offsets.append(-1)
+ else:
+ blob_offsets.append(offset)
+ offset += length
+ self.blob_lengths = blob_lengths
+ self.blob_offsets = blob_offsets
class BlobRecordIterator:
@@ -185,9 +195,11 @@ class BlobRecordIterator:
PLACE_HOLDER_LENGTH = -2
def __init__(self, file_io: FileIO, file_path: str, blob_lengths:
List[int],
- blob_offsets: List[int], field_name: str):
+ blob_offsets: List[int], field_name: str,
+ input_stream: Optional[BinaryIO] = None):
self.file_io = file_io
self.file_path = file_path
+ self.input_stream = input_stream
self.field_name = field_name
self.blob_lengths = blob_lengths
self.blob_offsets = blob_offsets
@@ -211,9 +223,28 @@ class BlobRecordIterator:
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4
bytes) = 12 bytes
blob_offset = self.blob_offsets[self.current_position] +
self.MAGIC_NUMBER_SIZE # Skip magic number
blob_length = length - self.METADATA_OVERHEAD
- blob = Blob.from_file(self.file_io, self.file_path, blob_offset,
blob_length)
+ if self.input_stream is not None:
+ blob = Blob.from_data(self._read_inline_blob(blob_offset,
blob_length))
+ else:
+ blob = Blob.from_file(self.file_io, self.file_path, blob_offset,
blob_length)
self.current_position += 1
return GenericRow([blob], fields, RowKind.INSERT)
def returned_position(self) -> int:
return self.current_position
+
+ def _read_inline_blob(self, position: int, length: int) -> bytes:
+ self.input_stream.seek(position)
+ data = self._read_fully(length)
+ if len(data) != length:
+ raise IOError("Invalid blob file: cannot read blob data")
+ return data
+
+ def _read_fully(self, length: int) -> bytes:
+ data = bytearray()
+ while len(data) < length:
+ chunk = self.input_stream.read(length - len(data))
+ if not chunk:
+ break
+ data.extend(chunk)
+ return bytes(data)
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
index e6b856432b..9d54949e2f 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -684,6 +684,48 @@ class BlobEndToEndTest(unittest.TestCase):
reader.close()
+ def test_blob_read_inline_bytes_reuses_reader_stream(self):
+ class CountingFileIO:
+
+ def __init__(self, delegate):
+ self._delegate = delegate
+ self.input_stream_count = 0
+
+ def __getattr__(self, name):
+ return getattr(self._delegate, name)
+
+ def new_input_stream(self, path):
+ self.input_stream_count += 1
+ return self._delegate.new_input_stream(path)
+
+ file_io = LocalFileIO(self.temp_dir, Options({}))
+ blob_field_name = "blob_field"
+ blob_data = [b"hello", b"world"]
+ schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+ table = pa.table([blob_data], schema=schema)
+ blob_file_path = Path(self.temp_dir) / (blob_field_name +
"_inline.blob")
+ blob_file_url = _to_url(blob_file_path)
+ file_io.write_blob(blob_file_url, table)
+
+ counting_file_io = CountingFileIO(file_io)
+ read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ reader = FormatBlobReader(
+ file_io=counting_file_io,
+ file_path=str(blob_file_path),
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False
+ )
+
+ batch = reader.read_arrow_batch()
+ self.assertIsNotNone(batch)
+ self.assertEqual(batch.num_rows, 2)
+ self.assertEqual(batch.column(0)[0].as_py(), b"hello")
+ self.assertEqual(batch.column(0)[1].as_py(), b"world")
+ self.assertEqual(counting_file_io.input_stream_count, 1)
+ reader.close()
+
def test_blob_complex_types_throw_exception(self):
"""Test that complex types containing BLOB elements throw exceptions
during read/write operations."""
from pypaimon.schema.data_types import DataField, AtomicType,
ArrayType, MultisetType, MapType