This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 21a66d19a0 [python] Data Evolution with_slice should use row id to do 
slice (#7199)
21a66d19a0 is described below

commit 21a66d19a01297a9ff40d552e672798dcb91f79e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Feb 4 06:04:33 2026 +0800

    [python] Data Evolution with_slice should use row id to do slice (#7199)
---
 .../read/reader/iface/record_batch_reader.py       |   9 ++
 .../read/scanner/data_evolution_split_generator.py | 167 ++++++++++++---------
 paimon-python/pypaimon/read/split_read.py          |  25 ++-
 3 files changed, 113 insertions(+), 88 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py 
b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
index ec3a1bc424..34a1f5dbbc 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
@@ -126,3 +126,12 @@ class RowPositionRecordIterator(RecordIterator[tuple]):
 
     def __next__(self):
         return self.next()
+
+
+class EmptyRecordBatchReader(RecordBatchReader):
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        return None
+
+    def close(self):
+        return None
diff --git 
a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py 
b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
index f52efb7ae1..6d39628635 100644
--- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
+++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
@@ -24,7 +24,6 @@ from pypaimon.manifest.schema.data_file_meta import 
DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
 from pypaimon.read.split import DataSplit, Split
-from pypaimon.read.sliced_split import SlicedSplit
 
 
 class DataEvolutionSplitGenerator(AbstractSplitGenerator):
@@ -65,17 +64,14 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
         for entry in sorted_entries:
             partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
 
-        plan_start_pos = 0
-        plan_end_pos = 0
+        slice_row_ranges = None  # Row ID ranges for slice-based filtering
 
         if self.start_pos_of_this_subtask is not None:
-            # shard data range: [plan_start_pos, plan_end_pos)
-            partitioned_files, plan_start_pos, plan_end_pos = \
-                self._filter_by_row_range(
-                    partitioned_files,
-                    self.start_pos_of_this_subtask,
-                    self.end_pos_of_this_subtask
-                )
+            # Calculate Row ID range for slice-based filtering
+            slice_row_ranges = 
self._calculate_slice_row_ranges(partitioned_files)
+            if slice_row_ranges:
+                # Filter files by Row ID range
+                partitioned_files = 
self._filter_files_by_row_ranges(partitioned_files, slice_row_ranges)
         elif self.idx_of_this_subtask is not None:
             partitioned_files = self._filter_by_shard(
                 partitioned_files, self.idx_of_this_subtask, 
self.number_of_para_subtasks
@@ -109,11 +105,15 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
                 flatten_packed_files, packed_files, sorted_entries_list
             )
 
-        if self.start_pos_of_this_subtask is not None:
-            splits = self._wrap_to_sliced_splits(splits, plan_start_pos, 
plan_end_pos)
-        # Wrap splits with IndexedSplit if row_ranges is provided
-        if self.row_ranges:
-            splits = self._wrap_to_indexed_splits(splits)
+        # merge slice_row_ranges and self.row_ranges
+        if slice_row_ranges is None:
+            slice_row_ranges = self.row_ranges
+        elif self.row_ranges is not None:
+            slice_row_ranges = Range.and_(slice_row_ranges, self.row_ranges)
+
+        # Wrap splits with IndexedSplit for slice-based filtering or row_ranges
+        if slice_row_ranges:
+            splits = self._wrap_to_indexed_splits(splits, slice_row_ranges)
 
         return splits
 
@@ -171,76 +171,99 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
                 splits.append(split)
         return splits
 
-    def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int, 
plan_end_pos: int) -> List[Split]:
+    def _calculate_slice_row_ranges(self, partitioned_files: defaultdict) -> 
List[Range]:
         """
-        Wrap splits with SlicedSplit to add file-level slicing information.
+        Calculate Row ID ranges for slice-based filtering based on start_pos 
and end_pos.
         """
-        sliced_splits = []
-        file_end_pos = 0  # end row position of current file in all splits data
+        # Collect all Row ID ranges from files
+        list_ranges = []
+        for file_entries in partitioned_files.values():
+            for entry in file_entries:
+                first_row_id = entry.file.first_row_id
+                # Range is inclusive [from_, to], so use row_count - 1
+                list_ranges.append(Range(first_row_id, first_row_id + 
entry.file.row_count - 1))
 
-        for split in splits:
-            # Compute file index map for both data and blob files
-            # Blob files share the same row position tracking as data files
-            shard_file_idx_map = self._compute_slice_split_file_idx_map(
-                plan_start_pos, plan_end_pos, split, file_end_pos
-            )
-            file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY]
-            del shard_file_idx_map[self.NEXT_POS_KEY]
-            
-            if shard_file_idx_map:
-                sliced_splits.append(SlicedSplit(split, shard_file_idx_map))
-            else:
-                sliced_splits.append(split)
+        # Merge overlapping ranges
+        sorted_ranges = Range.sort_and_merge_overlap(list_ranges, True, False)
 
-        return sliced_splits
+        # Calculate the Row ID range for this slice
+        start_range, end_range = self._divide_ranges_by_position(sorted_ranges)
+        if start_range is None or end_range is None:
+            return []
 
-    def _filter_by_row_range(
-        self,
-        partitioned_files: defaultdict,
-        start_pos: int,
-        end_pos: int
-    ) -> tuple:
+        # Return the range for this slice
+        return [Range(start_range.from_, end_range.to)]
+
+    def _divide_ranges_by_position(self, sorted_ranges: List[Range]) -> 
Tuple[Optional[Range], Optional[Range]]:
         """
-        Filter file entries by row range for data evolution tables.
+        Divide ranges by position (start_pos, end_pos) to get the Row ID range 
for this slice.
+        """
+        if not sorted_ranges:
+            return None, None
+
+        total_row_count = sum(r.count() for r in sorted_ranges)
+        start_pos = self.start_pos_of_this_subtask
+        end_pos = self.end_pos_of_this_subtask
+
+        if start_pos >= total_row_count:
+            return None, None
+
+        # Find the start Row ID
+        current_pos = 0
+        start_row_id = None
+        end_row_id = None
+
+        for r in sorted_ranges:
+            range_end_pos = current_pos + r.count()
+
+            # Find start Row ID
+            if start_row_id is None and start_pos < range_end_pos:
+                offset = start_pos - current_pos
+                start_row_id = r.from_ + offset
+
+            # Find end Row ID
+            if end_pos <= range_end_pos:
+                offset = end_pos - current_pos
+                end_row_id = r.from_ + offset - 1  # -1 because end_pos is 
exclusive
+                break
+
+            current_pos = range_end_pos
+
+        if start_row_id is None:
+            return None, None
+        if end_row_id is None:
+            end_row_id = sorted_ranges[-1].to
+
+        return Range(start_row_id, start_row_id), Range(end_row_id, end_row_id)
+
+    @staticmethod
+    def _filter_files_by_row_ranges(partitioned_files: defaultdict, 
row_ranges: List[Range]) -> defaultdict:
+        """
+        Filter files by Row ID ranges. Keep files that overlap with the given 
ranges.
         """
-        plan_start_pos = 0
-        plan_end_pos = 0
-        entry_end_pos = 0  # end row position of current file in all data
-        splits_start_pos = 0
         filtered_partitioned_files = defaultdict(list)
 
-        # Iterate through all file entries to find files that overlap with 
current shard range
         for key, file_entries in partitioned_files.items():
             filtered_entries = []
-            blob_added = False  # If it is true, all blobs corresponding to 
this data file are added
+
             for entry in file_entries:
-                if self._is_blob_file(entry.file.file_name):
-                    if blob_added:
-                        filtered_entries.append(entry)
-                    continue
-                blob_added = False
-                entry_begin_pos = entry_end_pos  # Starting row position of 
current file in all data
-                entry_end_pos += entry.file.row_count  # Update to row 
position after current file
-
-                # If current file is completely after shard range, stop 
iteration
-                if entry_begin_pos >= end_pos:
-                    break
-                # If current file is completely before shard range, skip it
-                if entry_end_pos <= start_pos:
-                    continue
-                if entry_begin_pos <= start_pos < entry_end_pos:
-                    splits_start_pos = entry_begin_pos
-                    plan_start_pos = start_pos - entry_begin_pos
-                # If shard end position is within current file, record 
relative end position
-                if entry_begin_pos < end_pos <= entry_end_pos:
-                    plan_end_pos = end_pos - splits_start_pos
-                # Add files that overlap with shard range to result
-                filtered_entries.append(entry)
-                blob_added = True
+                first_row_id = entry.file.first_row_id
+                file_range = Range(first_row_id, first_row_id + 
entry.file.row_count - 1)
+
+                # Check if file overlaps with any of the row ranges
+                overlaps = False
+                for r in row_ranges:
+                    if r.overlaps(file_range):
+                        overlaps = True
+                        break
+
+                if overlaps:
+                    filtered_entries.append(entry)
+
             if filtered_entries:
                 filtered_partitioned_files[key] = filtered_entries
 
-        return filtered_partitioned_files, plan_start_pos, plan_end_pos
+        return filtered_partitioned_files
 
     def _filter_by_shard(self, partitioned_files: defaultdict, sub_task_id: 
int, total_tasks: int) -> defaultdict:
         list_ranges = []
@@ -413,7 +436,7 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
         shard_file_idx_map[self.NEXT_POS_KEY] = next_pos
         return shard_file_idx_map
 
-    def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
+    def _wrap_to_indexed_splits(self, splits: List[Split], row_ranges: 
List[Range]) -> List[Split]:
         """
         Wrap splits with IndexedSplit for row range filtering.
         """
@@ -438,7 +461,7 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
             file_ranges = Range.merge_sorted_as_possible(file_ranges)
 
             # Intersect with row_ranges from global index
-            expected = Range.and_(file_ranges, self.row_ranges)
+            expected = Range.and_(file_ranges, row_ranges)
 
             if not expected:
                 # No intersection, skip this split
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index e9b2f39877..2088310aa4 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -25,6 +25,7 @@ from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.deletionvectors import ApplyDeletionVectorReader
 from pypaimon.deletionvectors.deletion_vector import DeletionVector
+from pypaimon.globalindex import Range
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
@@ -44,7 +45,7 @@ from pypaimon.read.reader.format_blob_reader import 
FormatBlobReader
 from pypaimon.read.reader.format_lance_reader import FormatLanceReader
 from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
 from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
-                                                            RowPositionReader)
+                                                            RowPositionReader, 
EmptyRecordBatchReader)
 from pypaimon.read.reader.iface.record_reader import RecordReader
 from pypaimon.read.reader.key_value_unwrap_reader import \
     KeyValueUnwrapRecordReader
@@ -593,26 +594,18 @@ class DataEvolutionSplitRead(SplitRead):
     def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> 
Optional[RecordReader]:
         """Create a file reader for a single file."""
         def create_record_reader():
-            record_reader = self.file_reader_supplier(
+            return self.file_reader_supplier(
                 file=file,
                 for_merge_read=False,
                 read_fields=read_fields,
                 row_tracking_enabled=True)
-            if self.row_ranges is not None:
-                record_reader = RowIdFilterRecordBatchReader(record_reader, 
file.first_row_id, self.row_ranges)
-            return record_reader
-
-        shard_file_idx_map = (
-            self.split.shard_file_idx_map() if isinstance(self.split, 
SlicedSplit) else {}
-        )
-        if file.file_name in shard_file_idx_map:
-            (begin_pos, end_pos) = shard_file_idx_map[file.file_name]
-            if (begin_pos, end_pos) == (-1, -1):
-                return None
-            else:
-                return ShardBatchReader(create_record_reader(), begin_pos, 
end_pos)
-        else:
+        if self.row_ranges is None:
             return create_record_reader()
+        file_range = Range(file.first_row_id, file.first_row_id + 
file.row_count - 1)
+        row_ranges = Range.and_(self.row_ranges, [file_range])
+        if len(row_ranges) == 0:
+            return EmptyRecordBatchReader()
+        return RowIdFilterRecordBatchReader(create_record_reader(), 
file.first_row_id, row_ranges)
 
     def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> 
List[FieldBunch]:
         """Split files into field bunches."""

Reply via email to