This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d6965c9419 [python] Push down shard range to Vortex and unify Lance 
range parameter (#7673)
d6965c9419 is described below

commit d6965c941961d6353921c4c508d5cbcb97ddf2a5
Author: ChengHui Chen <[email protected]>
AuthorDate: Sat May 23 22:58:19 2026 +0800

    [python] Push down shard range to Vortex and unify Lance range parameter 
(#7673)
---
 .../pypaimon/read/reader/data_file_batch_reader.py |  17 ++-
 .../pypaimon/read/reader/format_lance_reader.py    |  12 +-
 .../pypaimon/read/reader/format_vortex_reader.py   |  11 +-
 paimon-python/pypaimon/read/split_read.py          |  59 +++++----
 .../pypaimon/tests/reader_append_only_test.py      | 141 +++++++++++++++++++++
 5 files changed, 204 insertions(+), 36 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 4035dcc4f0..f71a8ed9eb 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -42,7 +42,8 @@ class DataFileBatchReader(RecordBatchReader):
                  system_fields: dict,
                  blob_as_descriptor: bool = False,
                  blob_descriptor_fields: Optional[set] = None,
-                 file_io: Optional[FileIO] = None):
+                 file_io: Optional[FileIO] = None,
+                 row_id_offsets: Optional[List[int]] = None):
         self.format_reader = format_reader
         self.index_mapping = index_mapping
         self.partition_info = partition_info
@@ -50,6 +51,8 @@ class DataFileBatchReader(RecordBatchReader):
         self.schema_map = {field.name: field for field in 
PyarrowFieldParser.from_paimon_schema(fields)}
         self.row_tracking_enabled = row_tracking_enabled
         self.first_row_id = first_row_id
+        self.row_id_offsets = row_id_offsets
+        self._row_id_cursor = 0
         self.max_sequence_number = max_sequence_number
         self.system_fields = system_fields
         self.blob_as_descriptor = blob_as_descriptor
@@ -208,9 +211,15 @@ class DataFileBatchReader(RecordBatchReader):
         # Handle _ROW_ID field
         if SpecialFields.ROW_ID.name in self.system_fields.keys():
             idx = self.system_fields[SpecialFields.ROW_ID.name]
-            # Create a new array that fills with computed row IDs
-            arrays[idx] = pa.array(range(self.first_row_id, self.first_row_id 
+ record_batch.num_rows), type=pa.int64())
-            self.first_row_id += record_batch.num_rows
+            if self.row_id_offsets is not None:
+                end = self._row_id_cursor + record_batch.num_rows
+                row_ids = [self.first_row_id + o for o in 
self.row_id_offsets[self._row_id_cursor:end]]
+                arrays[idx] = pa.array(row_ids, type=pa.int64())
+                self._row_id_cursor = end
+            else:
+                row_id_range = range(self.first_row_id, self.first_row_id + 
record_batch.num_rows)
+                arrays[idx] = pa.array(row_id_range, type=pa.int64())
+                self.first_row_id += record_batch.num_rows
 
         # Handle _SEQUENCE_NUMBER field
         if SpecialFields.SEQUENCE_NUMBER.name in self.system_fields.keys():
diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py 
b/paimon-python/pypaimon/read/reader/format_lance_reader.py
index 6749976088..67edfbd025 100644
--- a/paimon-python/pypaimon/read/reader/format_lance_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py
@@ -34,10 +34,12 @@ class FormatLanceReader(RecordBatchReader):
     and filters it based on the provided predicate and projection.
     """
 
+    # row_indices: from IndexedSplit (ANN vector search), discrete local row 
offsets within the file.
+    # shard_range: from SlicedSplit (parallel shard scan), a contiguous 
[start, end) row range within the file.
     def __init__(self, file_io: FileIO, file_path: str, read_fields: 
List[DataField],
                  push_down_predicate: Any, batch_size: int = 1024,
-                 row_range: Optional[Tuple[int, int]] = None,
-                 row_indices: Optional[List[int]] = None):
+                 row_indices: Optional[List[int]] = None,
+                 shard_range: Optional[Tuple[int, int]] = None):
         """Initialize Lance reader."""
         import lance
 
@@ -64,11 +66,11 @@ class FormatLanceReader(RecordBatchReader):
             columns=columns_for_lance)
         if row_indices is not None:
             reader_results = lance_reader.take_rows(row_indices)
-        elif row_range is not None:
-            start, end = row_range
-            reader_results = lance_reader.read_range(start, end - start)
+        elif shard_range is not None:
+            reader_results = lance_reader.read_range(shard_range[0], 
shard_range[1] - shard_range[0])
         else:
             reader_results = lance_reader.read_all()
+
         pa_table = reader_results.to_table()
 
         # Precompute output schema for missing fields
diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py 
b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
index 53486567e3..ac48ca1abe 100644
--- a/paimon-python/pypaimon/read/reader/format_vortex_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import List, Optional, Any, Set
+from typing import List, Optional, Any, Set, Tuple
 
 import pyarrow as pa
 from pyarrow import RecordBatch
@@ -32,9 +32,12 @@ class FormatVortexReader(RecordBatchReader):
     and filters it based on the provided predicate and projection.
     """
 
+    # row_indices: from IndexedSplit (ANN vector search), discrete local row 
offsets within the file.
+    # shard_range: from SlicedSplit (parallel shard scan), a contiguous 
[start, end) row range within the file.
     def __init__(self, file_io: FileIO, file_path: str, read_fields: 
List[DataField],
                  push_down_predicate: Any, batch_size: int = 1024,
-                 row_indices: Optional[Any] = None,
+                 row_indices: Optional[List[int]] = None,
+                 shard_range: Optional[Tuple[int, int]] = None,
                  predicate_fields: Optional[Set[str]] = None):
         import vortex
 
@@ -71,6 +74,10 @@ class FormatVortexReader(RecordBatchReader):
         indices = None
         if row_indices is not None:
             indices = vortex.array(row_indices)
+        elif shard_range is not None:
+            # Vortex lacks a native range/slice scan API, so we materialize an
+            # index array. Acceptable trade-off vs reading the full file.
+            indices = vortex.array(range(shard_range[0], shard_range[1]))
 
         self.record_batch_reader = vortex_file.scan(
             columns_for_vortex, expr=vortex_expr, indices=indices, 
batch_size=batch_size).to_arrow()
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 2f233af743..c90d406019 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -165,10 +165,12 @@ class SplitRead(ABC):
     def create_reader(self) -> RecordReader:
         """Create a record reader for the given split."""
 
+    # row_ranges: from IndexedSplit (ANN vector search), a list of discrete 
global row ID ranges.
+    # shard_range: from SlicedSplit (parallel shard scan), a contiguous 
[start, end) row range within the file.
     def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
                              read_fields: List[str], row_tracking_enabled: 
bool,
-                             row_ranges=None,
-                             row_range: Optional[Tuple[int, int]] = None) -> 
RecordBatchReader:
+                             row_ranges: Optional[List[Range]] = None,
+                             shard_range: Optional[Tuple[int, int]] = None) -> 
RecordBatchReader:
         (read_file_fields, read_arrow_predicate) = 
self._get_fields_and_predicate(file.schema_id, read_fields)
 
         # Use external_path if available, otherwise use file_path
@@ -177,14 +179,13 @@ class SplitRead(ABC):
 
         batch_size = self.table.options.read_batch_size()
 
-        # Compute effective row ranges and Vortex row_indices from row_ranges
+        # Convert global row_ranges (IndexedSplit) to local row_indices for 
Vortex/Lance native pushdown
         row_indices = None
         if row_ranges is not None:
             effective_row_ranges = Range.and_(row_ranges, 
[file.row_id_range()])
             if len(effective_row_ranges) == 0:
                 return EmptyRecordBatchReader()
             if file_format in (CoreOptions.FILE_FORMAT_VORTEX, 
CoreOptions.FILE_FORMAT_LANCE):
-                # Convert global row ranges to local file indices for native 
pushdown
                 row_indices = []
                 for r in effective_row_ranges:
                     start = r.from_ - file.first_row_id
@@ -233,7 +234,8 @@ class SplitRead(ABC):
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
             format_reader = FormatLanceReader(self.table.file_io, file_path, 
ordered_read_fields,
                                               read_arrow_predicate, 
batch_size=batch_size,
-                                              row_range=row_range, 
row_indices=row_indices)
+                                              row_indices=row_indices,
+                                              shard_range=shard_range)
         elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
             if has_nested:
                 raise NotImplementedError(
@@ -243,6 +245,7 @@ class SplitRead(ABC):
             format_reader = FormatVortexReader(self.table.file_io, file_path, 
ordered_read_fields,
                                                read_arrow_predicate, 
batch_size=batch_size,
                                                row_indices=row_indices,
+                                               shard_range=shard_range,
                                                
predicate_fields=predicate_fields)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
@@ -272,6 +275,15 @@ class SplitRead(ABC):
             
SpecialFields.row_type_with_row_tracking(self.table.table_schema.fields)
             if row_tracking_enabled else self.table.table_schema.fields
         )
+
+        # When native shard pushdown is used, the format reader only returns 
rows
+        # starting from shard_range[0], so _ROW_ID must be offset accordingly.
+        effective_first_row_id = file.first_row_id
+        if (shard_range is not None and file.first_row_id is not None
+                and file_format in (
+                    CoreOptions.FILE_FORMAT_VORTEX, 
CoreOptions.FILE_FORMAT_LANCE)):
+            effective_first_row_id = file.first_row_id + shard_range[0]
+
         if for_merge_read:
             reader = DataFileBatchReader(
                 format_reader,
@@ -280,12 +292,13 @@ class SplitRead(ABC):
                 self.trimmed_primary_key,
                 table_schema_fields,
                 file.max_sequence_number,
-                file.first_row_id,
+                effective_first_row_id,
                 row_tracking_enabled,
                 system_fields,
                 blob_as_descriptor=blob_as_descriptor,
                 blob_descriptor_fields=blob_descriptor_fields,
-                file_io=self.table.file_io)
+                file_io=self.table.file_io,
+                row_id_offsets=row_indices)
         else:
             reader = DataFileBatchReader(
                 format_reader,
@@ -294,17 +307,23 @@ class SplitRead(ABC):
                 None,
                 table_schema_fields,
                 file.max_sequence_number,
-                file.first_row_id,
+                effective_first_row_id,
                 row_tracking_enabled,
                 system_fields,
                 blob_as_descriptor=blob_as_descriptor,
                 blob_descriptor_fields=blob_descriptor_fields,
-                file_io=self.table.file_io)
+                file_io=self.table.file_io,
+                row_id_offsets=row_indices)
 
         # For non-Vortex formats, wrap with RowIdFilterRecordBatchReader
         if row_ranges is not None and row_indices is None:
             reader = RowIdFilterRecordBatchReader(reader, file.first_row_id, 
effective_row_ranges)
 
+        # For formats without native shard support, wrap with ShardBatchReader
+        if shard_range is not None and file_format not in (
+                CoreOptions.FILE_FORMAT_VORTEX, CoreOptions.FILE_FORMAT_LANCE):
+            reader = ShardBatchReader(reader, shard_range[0], shard_range[1])
+
         return reader
 
     def _get_fields_and_predicate(self, schema_id: int, read_fields):
@@ -513,7 +532,6 @@ class SplitRead(ABC):
 class RawFileSplitRead(SplitRead):
     def raw_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> Optional[RecordReader]:
         read_fields = self._get_final_read_data_fields()
-        # If the current file needs to be further divided for reading, use 
ShardBatchReader
         # Check if this is a SlicedSplit to get shard_file_idx_map
         shard_file_idx_map = (
             self.split.shard_file_idx_map() if isinstance(self.split, 
SlicedSplit) else {}
@@ -522,21 +540,12 @@ class RawFileSplitRead(SplitRead):
             (start_pos, end_pos) = shard_file_idx_map[file.file_name]
             if (start_pos, end_pos) == (-1, -1):
                 return None
-            file_path = file.external_path if file.external_path else 
file.file_path
-            file_format = format_identifier(os.path.basename(file_path))
-            if file_format == CoreOptions.FILE_FORMAT_LANCE:
-                file_batch_reader = self.file_reader_supplier(
-                    file=file,
-                    for_merge_read=False,
-                    read_fields=read_fields,
-                    row_tracking_enabled=True,
-                    row_range=(start_pos, end_pos))
-            else:
-                file_batch_reader = ShardBatchReader(self.file_reader_supplier(
-                    file=file,
-                    for_merge_read=False,
-                    read_fields=read_fields,
-                    row_tracking_enabled=True), start_pos, end_pos)
+            file_batch_reader = self.file_reader_supplier(
+                file=file,
+                for_merge_read=False,
+                read_fields=read_fields,
+                row_tracking_enabled=True,
+                shard_range=(start_pos, end_pos))
         else:
             file_batch_reader = self.file_reader_supplier(
                 file=file,
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 5cb8a2dfca..68d268d655 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -225,6 +225,25 @@ class AoReaderTest(unittest.TestCase):
         expected = self.expected.select(['dt', 'user_id'])
         self.assertEqual(actual, expected)
 
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    def test_vortex_ao_reader_with_shard(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'vortex'})
+        self.catalog.create_table('default.test_append_only_vortex_shard', 
schema, False)
+        table = self.catalog.get_table('default.test_append_only_vortex_shard')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+
+        shard_tables = []
+        total_shards = 3
+        for i in range(total_shards):
+            splits = read_builder.new_scan().with_shard(i, 
total_shards).plan().splits()
+            shard_tables.append(table_read.to_arrow(splits))
+
+        actual = pa.concat_tables(shard_tables).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
     def test_lance_ao_reader_with_filter(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'lance'})
         self.catalog.create_table('default.test_append_only_lance_filter', 
schema, False)
@@ -246,6 +265,24 @@ class AoReaderTest(unittest.TestCase):
         ])
         self.assertEqual(actual.sort_by('user_id'), expected)
 
+    def test_lance_ao_reader_with_shard(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'lance'})
+        self.catalog.create_table('default.test_append_only_lance_shard', 
schema, False)
+        table = self.catalog.get_table('default.test_append_only_lance_shard')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+
+        shard_tables = []
+        total_shards = 3
+        for i in range(total_shards):
+            splits = read_builder.new_scan().with_shard(i, 
total_shards).plan().splits()
+            shard_tables.append(table_read.to_arrow(splits))
+
+        actual = pa.concat_tables(shard_tables).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
     def test_lance_sliced_split_row_range_pushdown(self):
         """
         SlicedSplit with Lance format calls read_range() instead of read_all(),
@@ -326,6 +363,51 @@ class AoReaderTest(unittest.TestCase):
         result_ids = sorted(result.column('id').to_pylist())
         self.assertEqual(result_ids, list(range(slice_start, slice_end)))
 
+    def test_lance_shard_row_id_correctness(self):
+        """
+        with_shard splits a file into contiguous ranges via SlicedSplit.
+        _ROW_ID across all shards must equal the full table's _ROW_ID.
+        """
+        try:
+            import lance  # noqa: F401
+        except ImportError:
+            self.skipTest("lance not installed")
+
+        schema = Schema.from_pyarrow_schema(
+            pa.schema([('id', pa.int64()), ('value', pa.string())]),
+            options={'file.format': 'lance',
+                     'row-tracking.enabled': 'true',
+                     'data-evolution.enabled': 'true'})
+        self.catalog.create_table('default.test_lance_shard_row_id', schema, 
False)
+        table = self.catalog.get_table('default.test_lance_shard_row_id')
+
+        n = 1000
+        pa_table = pa.table({'id': list(range(n)), 'value': [f'v{i}' for i in 
range(n)]})
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Read full table _ROW_ID as ground truth
+        read_builder = table.new_read_builder().with_projection(['id', 
'_ROW_ID'])
+        full_splits = read_builder.new_scan().plan().splits()
+        table_read = read_builder.new_read()
+        full_result = table_read.to_arrow(full_splits)
+        expected_row_ids = sorted(full_result.column('_ROW_ID').to_pylist())
+
+        # Read via with_shard and collect _ROW_ID from all shards
+        total_shards = 3
+        all_row_ids = []
+        for i in range(total_shards):
+            shard_splits = read_builder.new_scan().with_shard(i, 
total_shards).plan().splits()
+            shard_result = table_read.to_arrow(shard_splits)
+            all_row_ids.extend(shard_result.column('_ROW_ID').to_pylist())
+
+        self.assertEqual(sorted(all_row_ids), expected_row_ids)
+
     def test_lance_indexed_split_take_rows_pushdown(self):
         """
         IndexedSplit row ranges (from ANN global index results) are converted 
to
@@ -412,6 +494,65 @@ class AoReaderTest(unittest.TestCase):
         result_ids = sorted(result.column('id').to_pylist())
         self.assertEqual(result_ids, [50, 300, 700])
 
+    def test_lance_indexed_split_row_id_correctness(self):
+        """
+        IndexedSplit (ANN vector search) with native take_rows pushdown must
+        return correct _ROW_ID values matching the original global row IDs.
+        """
+        try:
+            import lance  # noqa: F401
+        except ImportError:
+            self.skipTest("lance not installed")
+
+        schema = Schema.from_pyarrow_schema(
+            pa.schema([('id', pa.int64()), ('value', pa.string())]),
+            options={'file.format': 'lance',
+                     'row-tracking.enabled': 'true',
+                     'data-evolution.enabled': 'true'})
+        self.catalog.create_table('default.test_lance_indexed_row_id', schema, 
False)
+        table = self.catalog.get_table('default.test_lance_indexed_row_id')
+
+        n = 1000
+        pa_table = pa.table({'id': list(range(n)), 'value': [f'v{i}' for i in 
range(n)]})
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Read full table to get ground-truth _ROW_ID for specific rows
+        read_builder = table.new_read_builder().with_projection(['id', 
'_ROW_ID'])
+        full_splits = read_builder.new_scan().plan().splits()
+        table_read = read_builder.new_read()
+        full_result = table_read.to_arrow(full_splits)
+        # Build id -> _ROW_ID mapping from full scan
+        id_to_row_id = dict(zip(
+            full_result.column('id').to_pylist(),
+            full_result.column('_ROW_ID').to_pylist()))
+
+        # Construct IndexedSplit targeting specific rows
+        data_split = full_splits[0]
+        file_meta = data_split.files[0]
+        first_row_id = file_meta.first_row_id
+
+        from pypaimon.globalindex.indexed_split import IndexedSplit
+        from pypaimon.utils.range import Range
+        target_local_offsets = [50, 300, 700]
+        target_global_ids = [first_row_id + o for o in target_local_offsets]
+        row_ranges = [Range(g, g) for g in target_global_ids]
+        indexed = IndexedSplit(data_split, row_ranges)
+
+        result = table_read.to_arrow([indexed])
+
+        self.assertEqual(result.num_rows, len(target_global_ids))
+        for i in range(result.num_rows):
+            row_id = result.column('_ROW_ID')[i].as_py()
+            data_id = result.column('id')[i].as_py()
+            self.assertEqual(row_id, id_to_row_id[data_id],
+                             f"row id={data_id}: _ROW_ID should be 
{id_to_row_id[data_id]}, got {row_id}")
+
     def test_append_only_multi_write_once_commit(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         
self.catalog.create_table('default.test_append_only_multi_once_commit', schema, 
False)

Reply via email to