This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e8dd5d3cfd [python] supports random access of blob files (#8033)
e8dd5d3cfd is described below
commit e8dd5d3cfdd29661da5152b44b972fbf3859ac9c
Author: Faiz <[email protected]>
AuthorDate: Fri May 29 21:49:12 2026 +0800
[python] supports random access of blob files (#8033)
---
.../pypaimon/read/reader/format_blob_reader.py | 23 ++++++++-
paimon-python/pypaimon/read/split_read.py | 8 +--
paimon-python/pypaimon/tests/blob_table_test.py | 9 +++-
paimon-python/pypaimon/tests/blob_test.py | 58 ++++++++++++++++++++++
4 files changed, 92 insertions(+), 6 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index 94b2f157c0..52f197097f 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -37,7 +37,7 @@ class FormatBlobReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
full_fields: List[DataField], push_down_predicate: Any,
blob_as_descriptor: bool,
- batch_size: int = 1024):
+ batch_size: int = 1024, row_indices: Optional[Any] = None):
self._file_io = file_io
self._file_path = file_path
self._push_down_predicate = push_down_predicate
@@ -56,6 +56,7 @@ class FormatBlobReader(RecordBatchReader):
self._file_size = file_io.get_file_size(file_path)
self._input_stream = file_io.new_input_stream(file_path)
self._read_index()
+ self._apply_row_indices(row_indices)
if self._blob_as_descriptor:
self._input_stream.close()
self._input_stream = None
@@ -187,6 +188,26 @@ class FormatBlobReader(RecordBatchReader):
self.blob_lengths = blob_lengths
self.blob_offsets = blob_offsets
+ def _apply_row_indices(self, row_indices: Optional[Any]) -> None:
+ if row_indices is None:
+ return
+
+ selected_lengths = []
+ selected_offsets = []
+ record_count = len(self.blob_lengths)
+ for row_index in row_indices:
+ row_index = int(row_index)
+ if row_index < 0 or row_index >= record_count:
+ raise IndexError(
+ f"Blob row index {row_index} is out of range for file "
+ f"{self.file_path}, record count: {record_count}."
+ )
+ selected_lengths.append(self.blob_lengths[row_index])
+ selected_offsets.append(self.blob_offsets[row_index])
+
+ self.blob_lengths = selected_lengths
+ self.blob_offsets = selected_offsets
+
class BlobRecordIterator:
MAGIC_NUMBER_SIZE = 4
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 351f8de83e..2a9a8f3db7 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -190,13 +190,14 @@ class SplitRead(ABC):
batch_size = self.table.options.read_batch_size()
- # Convert global row_ranges (IndexedSplit) to local row_indices for
Vortex/Lance native pushdown
+ # Convert global row_ranges (IndexedSplit) to local row_indices for
native pushdown.
row_indices = None
if row_ranges is not None:
effective_row_ranges = Range.and_(row_ranges,
[file.row_id_range()])
if len(effective_row_ranges) == 0:
return EmptyRecordBatchReader()
- row_index_formats = (CoreOptions.FILE_FORMAT_VORTEX,
+ row_index_formats = (CoreOptions.FILE_FORMAT_BLOB,
+ CoreOptions.FILE_FORMAT_VORTEX,
CoreOptions.FILE_FORMAT_LANCE,
CoreOptions.FILE_FORMAT_ROW)
if file_format in row_index_formats:
@@ -240,7 +241,8 @@ class SplitRead(ABC):
blob_as_descriptor =
CoreOptions.blob_as_descriptor(self.table.options)
format_reader = FormatBlobReader(self.table.file_io, file_path,
read_file_fields,
self.read_fields,
read_arrow_predicate, blob_as_descriptor,
- batch_size=batch_size)
+ batch_size=batch_size,
+ row_indices=row_indices)
elif file_format == CoreOptions.FILE_FORMAT_LANCE:
if has_nested:
raise NotImplementedError(
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 042c47699a..5eaa57b056 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -3177,14 +3177,14 @@ class DedicatedFormatWriterTest(unittest.TestCase):
self.catalog.create_table('test_db.blob_rowid_equal', schema, False)
table = self.catalog.get_table('test_db.blob_rowid_equal')
- blob_bytes = os.urandom(248 * 1024)
+ blob_values = [bytes([i]) * (248 * 1024) for i in range(20)]
write_builder = table.new_batch_write_builder()
tw = write_builder.new_write()
tc = write_builder.new_commit()
batch = pa.Table.from_pydict({
'id': list(range(20)),
'name': [f'item_{i}' for i in range(20)],
- 'data': [blob_bytes] * 20,
+ 'data': blob_values,
}, schema=pa_schema)
tw.write_arrow(batch)
tc.commit(tw.prepare_commit())
@@ -3195,6 +3195,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
from pypaimon.table.special_fields import SpecialFields
rb = table.new_read_builder()
+ rb.with_projection(['id', 'name', 'data', SpecialFields.ROW_ID.name])
fields = list(table.fields)
fields.append(SpecialFields.ROW_ID)
pb = PredicateBuilder(fields)
@@ -3206,6 +3207,10 @@ class DedicatedFormatWriterTest(unittest.TestCase):
read = rb.new_read()
result = read.to_arrow(splits)
self.assertEqual(result.num_rows, 1)
+ self.assertEqual(result.column('id').to_pylist(), [5])
+ self.assertEqual(result.column('name').to_pylist(), ['item_5'])
+ self.assertEqual(result.column('data').to_pylist(), [blob_values[5]])
+ self.assertEqual(result.column(SpecialFields.ROW_ID.name).to_pylist(),
[5])
def test_rename_blob_column_should_fail(self):
pa_schema = pa.schema([
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
index 9d54949e2f..b91ffdaf43 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -726,6 +726,64 @@ class BlobEndToEndTest(unittest.TestCase):
self.assertEqual(counting_file_io.input_stream_count, 1)
reader.close()
+ def test_blob_reader_row_indices_pushdown(self):
+ file_io = LocalFileIO(self.temp_dir, Options({}))
+ blob_field_name = "blob_field"
+ blob_data = [f"value_{i}".encode("utf-8") for i in range(6)]
+ schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+ table = pa.table([blob_data], schema=schema)
+ blob_file_path = Path(self.temp_dir) / "row_indices.blob"
+ blob_file_url = _to_url(blob_file_path)
+ file_io.write_blob(blob_file_url, table)
+
+ read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(blob_file_path),
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False,
+ batch_size=2,
+ row_indices=[1, 3, 4],
+ )
+ try:
+ batch = reader.read_arrow_batch()
+ self.assertIsNotNone(batch)
+ self.assertEqual(batch.column(0).to_pylist(), [blob_data[1],
blob_data[3]])
+
+ batch = reader.read_arrow_batch()
+ self.assertIsNotNone(batch)
+ self.assertEqual(batch.column(0).to_pylist(), [blob_data[4]])
+
+ self.assertIsNone(reader.read_arrow_batch())
+ finally:
+ reader.close()
+
+ def test_blob_reader_row_indices_out_of_range(self):
+ file_io = LocalFileIO(self.temp_dir, Options({}))
+ blob_field_name = "blob_field"
+ blob_data = [b"value_0", b"value_1"]
+ schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+ table = pa.table([blob_data], schema=schema)
+ blob_file_path = Path(self.temp_dir) / "row_indices_out_of_range.blob"
+ blob_file_url = _to_url(blob_file_path)
+ file_io.write_blob(blob_file_url, table)
+
+ read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ with self.assertRaises(IndexError) as context:
+ FormatBlobReader(
+ file_io=file_io,
+ file_path=str(blob_file_path),
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False,
+ row_indices=[0, 2],
+ )
+
+ self.assertIn("Blob row index 2 is out of range",
str(context.exception))
+
def test_blob_complex_types_throw_exception(self):
"""Test that complex types containing BLOB elements throw exceptions
during read/write operations."""
from pypaimon.schema.data_types import DataField, AtomicType,
ArrayType, MultisetType, MapType