This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d6965c9419 [python] Push down shard range to Vortex and unify Lance
range parameter (#7673)
d6965c9419 is described below
commit d6965c941961d6353921c4c508d5cbcb97ddf2a5
Author: ChengHui Chen <[email protected]>
AuthorDate: Sat May 23 22:58:19 2026 +0800
[python] Push down shard range to Vortex and unify Lance range parameter
(#7673)
---
.../pypaimon/read/reader/data_file_batch_reader.py | 17 ++-
.../pypaimon/read/reader/format_lance_reader.py | 12 +-
.../pypaimon/read/reader/format_vortex_reader.py | 11 +-
paimon-python/pypaimon/read/split_read.py | 59 +++++----
.../pypaimon/tests/reader_append_only_test.py | 141 +++++++++++++++++++++
5 files changed, 204 insertions(+), 36 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 4035dcc4f0..f71a8ed9eb 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -42,7 +42,8 @@ class DataFileBatchReader(RecordBatchReader):
system_fields: dict,
blob_as_descriptor: bool = False,
blob_descriptor_fields: Optional[set] = None,
- file_io: Optional[FileIO] = None):
+ file_io: Optional[FileIO] = None,
+ row_id_offsets: Optional[List[int]] = None):
self.format_reader = format_reader
self.index_mapping = index_mapping
self.partition_info = partition_info
@@ -50,6 +51,8 @@ class DataFileBatchReader(RecordBatchReader):
self.schema_map = {field.name: field for field in
PyarrowFieldParser.from_paimon_schema(fields)}
self.row_tracking_enabled = row_tracking_enabled
self.first_row_id = first_row_id
+ self.row_id_offsets = row_id_offsets
+ self._row_id_cursor = 0
self.max_sequence_number = max_sequence_number
self.system_fields = system_fields
self.blob_as_descriptor = blob_as_descriptor
@@ -208,9 +211,15 @@ class DataFileBatchReader(RecordBatchReader):
# Handle _ROW_ID field
if SpecialFields.ROW_ID.name in self.system_fields.keys():
idx = self.system_fields[SpecialFields.ROW_ID.name]
- # Create a new array that fills with computed row IDs
- arrays[idx] = pa.array(range(self.first_row_id, self.first_row_id
+ record_batch.num_rows), type=pa.int64())
- self.first_row_id += record_batch.num_rows
+ if self.row_id_offsets is not None:
+ end = self._row_id_cursor + record_batch.num_rows
+ row_ids = [self.first_row_id + o for o in
self.row_id_offsets[self._row_id_cursor:end]]
+ arrays[idx] = pa.array(row_ids, type=pa.int64())
+ self._row_id_cursor = end
+ else:
+ row_id_range = range(self.first_row_id, self.first_row_id +
record_batch.num_rows)
+ arrays[idx] = pa.array(row_id_range, type=pa.int64())
+ self.first_row_id += record_batch.num_rows
# Handle _SEQUENCE_NUMBER field
if SpecialFields.SEQUENCE_NUMBER.name in self.system_fields.keys():
diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py
b/paimon-python/pypaimon/read/reader/format_lance_reader.py
index 6749976088..67edfbd025 100644
--- a/paimon-python/pypaimon/read/reader/format_lance_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py
@@ -34,10 +34,12 @@ class FormatLanceReader(RecordBatchReader):
and filters it based on the provided predicate and projection.
"""
+ # row_indices: from IndexedSplit (ANN vector search), discrete local row
offsets within the file.
+ # shard_range: from SlicedSplit (parallel shard scan), a contiguous
[start, end) row range within the file.
def __init__(self, file_io: FileIO, file_path: str, read_fields:
List[DataField],
push_down_predicate: Any, batch_size: int = 1024,
- row_range: Optional[Tuple[int, int]] = None,
- row_indices: Optional[List[int]] = None):
+ row_indices: Optional[List[int]] = None,
+ shard_range: Optional[Tuple[int, int]] = None):
"""Initialize Lance reader."""
import lance
@@ -64,11 +66,11 @@ class FormatLanceReader(RecordBatchReader):
columns=columns_for_lance)
if row_indices is not None:
reader_results = lance_reader.take_rows(row_indices)
- elif row_range is not None:
- start, end = row_range
- reader_results = lance_reader.read_range(start, end - start)
+ elif shard_range is not None:
+ reader_results = lance_reader.read_range(shard_range[0],
shard_range[1] - shard_range[0])
else:
reader_results = lance_reader.read_all()
+
pa_table = reader_results.to_table()
# Precompute output schema for missing fields
diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py
b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
index 53486567e3..ac48ca1abe 100644
--- a/paimon-python/pypaimon/read/reader/format_vortex_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from typing import List, Optional, Any, Set
+from typing import List, Optional, Any, Set, Tuple
import pyarrow as pa
from pyarrow import RecordBatch
@@ -32,9 +32,12 @@ class FormatVortexReader(RecordBatchReader):
and filters it based on the provided predicate and projection.
"""
+ # row_indices: from IndexedSplit (ANN vector search), discrete local row
offsets within the file.
+ # shard_range: from SlicedSplit (parallel shard scan), a contiguous
[start, end) row range within the file.
def __init__(self, file_io: FileIO, file_path: str, read_fields:
List[DataField],
push_down_predicate: Any, batch_size: int = 1024,
- row_indices: Optional[Any] = None,
+ row_indices: Optional[List[int]] = None,
+ shard_range: Optional[Tuple[int, int]] = None,
predicate_fields: Optional[Set[str]] = None):
import vortex
@@ -71,6 +74,10 @@ class FormatVortexReader(RecordBatchReader):
indices = None
if row_indices is not None:
indices = vortex.array(row_indices)
+ elif shard_range is not None:
+ # Vortex lacks a native range/slice scan API, so we materialize an
+ # index array. Acceptable trade-off vs reading the full file.
+ indices = vortex.array(range(shard_range[0], shard_range[1]))
self.record_batch_reader = vortex_file.scan(
columns_for_vortex, expr=vortex_expr, indices=indices,
batch_size=batch_size).to_arrow()
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 2f233af743..c90d406019 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -165,10 +165,12 @@ class SplitRead(ABC):
def create_reader(self) -> RecordReader:
"""Create a record reader for the given split."""
+ # row_ranges: from IndexedSplit (ANN vector search), a list of discrete
global row ID ranges.
+ # shard_range: from SlicedSplit (parallel shard scan), a contiguous
[start, end) row range within the file.
def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
read_fields: List[str], row_tracking_enabled:
bool,
- row_ranges=None,
- row_range: Optional[Tuple[int, int]] = None) ->
RecordBatchReader:
+ row_ranges: Optional[List[Range]] = None,
+ shard_range: Optional[Tuple[int, int]] = None) ->
RecordBatchReader:
(read_file_fields, read_arrow_predicate) =
self._get_fields_and_predicate(file.schema_id, read_fields)
# Use external_path if available, otherwise use file_path
@@ -177,14 +179,13 @@ class SplitRead(ABC):
batch_size = self.table.options.read_batch_size()
- # Compute effective row ranges and Vortex row_indices from row_ranges
+ # Convert global row_ranges (IndexedSplit) to local row_indices for
Vortex/Lance native pushdown
row_indices = None
if row_ranges is not None:
effective_row_ranges = Range.and_(row_ranges,
[file.row_id_range()])
if len(effective_row_ranges) == 0:
return EmptyRecordBatchReader()
if file_format in (CoreOptions.FILE_FORMAT_VORTEX,
CoreOptions.FILE_FORMAT_LANCE):
- # Convert global row ranges to local file indices for native
pushdown
row_indices = []
for r in effective_row_ranges:
start = r.from_ - file.first_row_id
@@ -233,7 +234,8 @@ class SplitRead(ABC):
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
format_reader = FormatLanceReader(self.table.file_io, file_path,
ordered_read_fields,
read_arrow_predicate,
batch_size=batch_size,
- row_range=row_range,
row_indices=row_indices)
+ row_indices=row_indices,
+ shard_range=shard_range)
elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
if has_nested:
raise NotImplementedError(
@@ -243,6 +245,7 @@ class SplitRead(ABC):
format_reader = FormatVortexReader(self.table.file_io, file_path,
ordered_read_fields,
read_arrow_predicate,
batch_size=batch_size,
row_indices=row_indices,
+ shard_range=shard_range,
predicate_fields=predicate_fields)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
@@ -272,6 +275,15 @@ class SplitRead(ABC):
SpecialFields.row_type_with_row_tracking(self.table.table_schema.fields)
if row_tracking_enabled else self.table.table_schema.fields
)
+
+ # When native shard pushdown is used, the format reader only returns
rows
+ # starting from shard_range[0], so _ROW_ID must be offset accordingly.
+ effective_first_row_id = file.first_row_id
+ if (shard_range is not None and file.first_row_id is not None
+ and file_format in (
+ CoreOptions.FILE_FORMAT_VORTEX,
CoreOptions.FILE_FORMAT_LANCE)):
+ effective_first_row_id = file.first_row_id + shard_range[0]
+
if for_merge_read:
reader = DataFileBatchReader(
format_reader,
@@ -280,12 +292,13 @@ class SplitRead(ABC):
self.trimmed_primary_key,
table_schema_fields,
file.max_sequence_number,
- file.first_row_id,
+ effective_first_row_id,
row_tracking_enabled,
system_fields,
blob_as_descriptor=blob_as_descriptor,
blob_descriptor_fields=blob_descriptor_fields,
- file_io=self.table.file_io)
+ file_io=self.table.file_io,
+ row_id_offsets=row_indices)
else:
reader = DataFileBatchReader(
format_reader,
@@ -294,17 +307,23 @@ class SplitRead(ABC):
None,
table_schema_fields,
file.max_sequence_number,
- file.first_row_id,
+ effective_first_row_id,
row_tracking_enabled,
system_fields,
blob_as_descriptor=blob_as_descriptor,
blob_descriptor_fields=blob_descriptor_fields,
- file_io=self.table.file_io)
+ file_io=self.table.file_io,
+ row_id_offsets=row_indices)
# For non-Vortex formats, wrap with RowIdFilterRecordBatchReader
if row_ranges is not None and row_indices is None:
reader = RowIdFilterRecordBatchReader(reader, file.first_row_id,
effective_row_ranges)
+ # For formats without native shard support, wrap with ShardBatchReader
+ if shard_range is not None and file_format not in (
+ CoreOptions.FILE_FORMAT_VORTEX, CoreOptions.FILE_FORMAT_LANCE):
+ reader = ShardBatchReader(reader, shard_range[0], shard_range[1])
+
return reader
def _get_fields_and_predicate(self, schema_id: int, read_fields):
@@ -513,7 +532,6 @@ class SplitRead(ABC):
class RawFileSplitRead(SplitRead):
def raw_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> Optional[RecordReader]:
read_fields = self._get_final_read_data_fields()
- # If the current file needs to be further divided for reading, use
ShardBatchReader
# Check if this is a SlicedSplit to get shard_file_idx_map
shard_file_idx_map = (
self.split.shard_file_idx_map() if isinstance(self.split,
SlicedSplit) else {}
@@ -522,21 +540,12 @@ class RawFileSplitRead(SplitRead):
(start_pos, end_pos) = shard_file_idx_map[file.file_name]
if (start_pos, end_pos) == (-1, -1):
return None
- file_path = file.external_path if file.external_path else
file.file_path
- file_format = format_identifier(os.path.basename(file_path))
- if file_format == CoreOptions.FILE_FORMAT_LANCE:
- file_batch_reader = self.file_reader_supplier(
- file=file,
- for_merge_read=False,
- read_fields=read_fields,
- row_tracking_enabled=True,
- row_range=(start_pos, end_pos))
- else:
- file_batch_reader = ShardBatchReader(self.file_reader_supplier(
- file=file,
- for_merge_read=False,
- read_fields=read_fields,
- row_tracking_enabled=True), start_pos, end_pos)
+ file_batch_reader = self.file_reader_supplier(
+ file=file,
+ for_merge_read=False,
+ read_fields=read_fields,
+ row_tracking_enabled=True,
+ shard_range=(start_pos, end_pos))
else:
file_batch_reader = self.file_reader_supplier(
file=file,
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 5cb8a2dfca..68d268d655 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -225,6 +225,25 @@ class AoReaderTest(unittest.TestCase):
expected = self.expected.select(['dt', 'user_id'])
self.assertEqual(actual, expected)
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ def test_vortex_ao_reader_with_shard(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'vortex'})
+ self.catalog.create_table('default.test_append_only_vortex_shard',
schema, False)
+ table = self.catalog.get_table('default.test_append_only_vortex_shard')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+
+ shard_tables = []
+ total_shards = 3
+ for i in range(total_shards):
+ splits = read_builder.new_scan().with_shard(i,
total_shards).plan().splits()
+ shard_tables.append(table_read.to_arrow(splits))
+
+ actual = pa.concat_tables(shard_tables).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
def test_lance_ao_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'lance'})
self.catalog.create_table('default.test_append_only_lance_filter',
schema, False)
@@ -246,6 +265,24 @@ class AoReaderTest(unittest.TestCase):
])
self.assertEqual(actual.sort_by('user_id'), expected)
+ def test_lance_ao_reader_with_shard(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'lance'})
+ self.catalog.create_table('default.test_append_only_lance_shard',
schema, False)
+ table = self.catalog.get_table('default.test_append_only_lance_shard')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+
+ shard_tables = []
+ total_shards = 3
+ for i in range(total_shards):
+ splits = read_builder.new_scan().with_shard(i,
total_shards).plan().splits()
+ shard_tables.append(table_read.to_arrow(splits))
+
+ actual = pa.concat_tables(shard_tables).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
def test_lance_sliced_split_row_range_pushdown(self):
"""
SlicedSplit with Lance format calls read_range() instead of read_all(),
@@ -326,6 +363,51 @@ class AoReaderTest(unittest.TestCase):
result_ids = sorted(result.column('id').to_pylist())
self.assertEqual(result_ids, list(range(slice_start, slice_end)))
+ def test_lance_shard_row_id_correctness(self):
+ """
+ with_shard splits a file into contiguous ranges via SlicedSplit.
+ _ROW_ID across all shards must equal the full table's _ROW_ID.
+ """
+ try:
+ import lance # noqa: F401
+ except ImportError:
+ self.skipTest("lance not installed")
+
+ schema = Schema.from_pyarrow_schema(
+ pa.schema([('id', pa.int64()), ('value', pa.string())]),
+ options={'file.format': 'lance',
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'})
+ self.catalog.create_table('default.test_lance_shard_row_id', schema,
False)
+ table = self.catalog.get_table('default.test_lance_shard_row_id')
+
+ n = 1000
+ pa_table = pa.table({'id': list(range(n)), 'value': [f'v{i}' for i in
range(n)]})
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Read full table _ROW_ID as ground truth
+ read_builder = table.new_read_builder().with_projection(['id',
'_ROW_ID'])
+ full_splits = read_builder.new_scan().plan().splits()
+ table_read = read_builder.new_read()
+ full_result = table_read.to_arrow(full_splits)
+ expected_row_ids = sorted(full_result.column('_ROW_ID').to_pylist())
+
+ # Read via with_shard and collect _ROW_ID from all shards
+ total_shards = 3
+ all_row_ids = []
+ for i in range(total_shards):
+ shard_splits = read_builder.new_scan().with_shard(i,
total_shards).plan().splits()
+ shard_result = table_read.to_arrow(shard_splits)
+ all_row_ids.extend(shard_result.column('_ROW_ID').to_pylist())
+
+ self.assertEqual(sorted(all_row_ids), expected_row_ids)
+
def test_lance_indexed_split_take_rows_pushdown(self):
"""
IndexedSplit row ranges (from ANN global index results) are converted
to
@@ -412,6 +494,65 @@ class AoReaderTest(unittest.TestCase):
result_ids = sorted(result.column('id').to_pylist())
self.assertEqual(result_ids, [50, 300, 700])
+ def test_lance_indexed_split_row_id_correctness(self):
+ """
+ IndexedSplit (ANN vector search) with native take_rows pushdown must
+ return correct _ROW_ID values matching the original global row IDs.
+ """
+ try:
+ import lance # noqa: F401
+ except ImportError:
+ self.skipTest("lance not installed")
+
+ schema = Schema.from_pyarrow_schema(
+ pa.schema([('id', pa.int64()), ('value', pa.string())]),
+ options={'file.format': 'lance',
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'})
+ self.catalog.create_table('default.test_lance_indexed_row_id', schema,
False)
+ table = self.catalog.get_table('default.test_lance_indexed_row_id')
+
+ n = 1000
+ pa_table = pa.table({'id': list(range(n)), 'value': [f'v{i}' for i in
range(n)]})
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Read full table to get ground-truth _ROW_ID for specific rows
+ read_builder = table.new_read_builder().with_projection(['id',
'_ROW_ID'])
+ full_splits = read_builder.new_scan().plan().splits()
+ table_read = read_builder.new_read()
+ full_result = table_read.to_arrow(full_splits)
+ # Build id -> _ROW_ID mapping from full scan
+ id_to_row_id = dict(zip(
+ full_result.column('id').to_pylist(),
+ full_result.column('_ROW_ID').to_pylist()))
+
+ # Construct IndexedSplit targeting specific rows
+ data_split = full_splits[0]
+ file_meta = data_split.files[0]
+ first_row_id = file_meta.first_row_id
+
+ from pypaimon.globalindex.indexed_split import IndexedSplit
+ from pypaimon.utils.range import Range
+ target_local_offsets = [50, 300, 700]
+ target_global_ids = [first_row_id + o for o in target_local_offsets]
+ row_ranges = [Range(g, g) for g in target_global_ids]
+ indexed = IndexedSplit(data_split, row_ranges)
+
+ result = table_read.to_arrow([indexed])
+
+ self.assertEqual(result.num_rows, len(target_global_ids))
+ for i in range(result.num_rows):
+ row_id = result.column('_ROW_ID')[i].as_py()
+ data_id = result.column('id')[i].as_py()
+ self.assertEqual(row_id, id_to_row_id[data_id],
+ f"row id={data_id}: _ROW_ID should be
{id_to_row_id[data_id]}, got {row_id}")
+
def test_append_only_multi_write_once_commit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_multi_once_commit', schema,
False)