This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 21a66d19a0 [python] Data Evolution with_slice should use row id to do
slice (#7199)
21a66d19a0 is described below
commit 21a66d19a01297a9ff40d552e672798dcb91f79e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Feb 4 06:04:33 2026 +0800
[python] Data Evolution with_slice should use row id to do slice (#7199)
---
.../read/reader/iface/record_batch_reader.py | 9 ++
.../read/scanner/data_evolution_split_generator.py | 167 ++++++++++++---------
paimon-python/pypaimon/read/split_read.py | 25 ++-
3 files changed, 113 insertions(+), 88 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
index ec3a1bc424..34a1f5dbbc 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
@@ -126,3 +126,12 @@ class RowPositionRecordIterator(RecordIterator[tuple]):
def __next__(self):
return self.next()
+
+
+class EmptyRecordBatchReader(RecordBatchReader):
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ return None
+
+ def close(self):
+ return None
diff --git
a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
index f52efb7ae1..6d39628635 100644
--- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
+++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
@@ -24,7 +24,6 @@ from pypaimon.manifest.schema.data_file_meta import
DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
from pypaimon.read.split import DataSplit, Split
-from pypaimon.read.sliced_split import SlicedSplit
class DataEvolutionSplitGenerator(AbstractSplitGenerator):
@@ -65,17 +64,14 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
for entry in sorted_entries:
partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
- plan_start_pos = 0
- plan_end_pos = 0
+ slice_row_ranges = None # Row ID ranges for slice-based filtering
if self.start_pos_of_this_subtask is not None:
- # shard data range: [plan_start_pos, plan_end_pos)
- partitioned_files, plan_start_pos, plan_end_pos = \
- self._filter_by_row_range(
- partitioned_files,
- self.start_pos_of_this_subtask,
- self.end_pos_of_this_subtask
- )
+ # Calculate Row ID range for slice-based filtering
+ slice_row_ranges =
self._calculate_slice_row_ranges(partitioned_files)
+ if slice_row_ranges:
+ # Filter files by Row ID range
+ partitioned_files =
self._filter_files_by_row_ranges(partitioned_files, slice_row_ranges)
elif self.idx_of_this_subtask is not None:
partitioned_files = self._filter_by_shard(
partitioned_files, self.idx_of_this_subtask,
self.number_of_para_subtasks
@@ -109,11 +105,15 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
flatten_packed_files, packed_files, sorted_entries_list
)
- if self.start_pos_of_this_subtask is not None:
- splits = self._wrap_to_sliced_splits(splits, plan_start_pos,
plan_end_pos)
- # Wrap splits with IndexedSplit if row_ranges is provided
- if self.row_ranges:
- splits = self._wrap_to_indexed_splits(splits)
+ # merge slice_row_ranges and self.row_ranges
+ if slice_row_ranges is None:
+ slice_row_ranges = self.row_ranges
+ elif self.row_ranges is not None:
+ slice_row_ranges = Range.and_(slice_row_ranges, self.row_ranges)
+
+ # Wrap splits with IndexedSplit for slice-based filtering or row_ranges
+ if slice_row_ranges:
+ splits = self._wrap_to_indexed_splits(splits, slice_row_ranges)
return splits
@@ -171,76 +171,99 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
splits.append(split)
return splits
- def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int,
plan_end_pos: int) -> List[Split]:
+ def _calculate_slice_row_ranges(self, partitioned_files: defaultdict) ->
List[Range]:
"""
- Wrap splits with SlicedSplit to add file-level slicing information.
+ Calculate Row ID ranges for slice-based filtering based on start_pos
and end_pos.
"""
- sliced_splits = []
- file_end_pos = 0 # end row position of current file in all splits data
+ # Collect all Row ID ranges from files
+ list_ranges = []
+ for file_entries in partitioned_files.values():
+ for entry in file_entries:
+ first_row_id = entry.file.first_row_id
+ # Range is inclusive [from_, to], so use row_count - 1
+ list_ranges.append(Range(first_row_id, first_row_id +
entry.file.row_count - 1))
- for split in splits:
- # Compute file index map for both data and blob files
- # Blob files share the same row position tracking as data files
- shard_file_idx_map = self._compute_slice_split_file_idx_map(
- plan_start_pos, plan_end_pos, split, file_end_pos
- )
- file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY]
- del shard_file_idx_map[self.NEXT_POS_KEY]
-
- if shard_file_idx_map:
- sliced_splits.append(SlicedSplit(split, shard_file_idx_map))
- else:
- sliced_splits.append(split)
+ # Merge overlapping ranges
+ sorted_ranges = Range.sort_and_merge_overlap(list_ranges, True, False)
- return sliced_splits
+ # Calculate the Row ID range for this slice
+ start_range, end_range = self._divide_ranges_by_position(sorted_ranges)
+ if start_range is None or end_range is None:
+ return []
- def _filter_by_row_range(
- self,
- partitioned_files: defaultdict,
- start_pos: int,
- end_pos: int
- ) -> tuple:
+ # Return the range for this slice
+ return [Range(start_range.from_, end_range.to)]
+
+ def _divide_ranges_by_position(self, sorted_ranges: List[Range]) ->
Tuple[Optional[Range], Optional[Range]]:
"""
- Filter file entries by row range for data evolution tables.
+ Divide ranges by position (start_pos, end_pos) to get the Row ID range
for this slice.
+ """
+ if not sorted_ranges:
+ return None, None
+
+ total_row_count = sum(r.count() for r in sorted_ranges)
+ start_pos = self.start_pos_of_this_subtask
+ end_pos = self.end_pos_of_this_subtask
+
+ if start_pos >= total_row_count:
+ return None, None
+
+ # Find the start Row ID
+ current_pos = 0
+ start_row_id = None
+ end_row_id = None
+
+ for r in sorted_ranges:
+ range_end_pos = current_pos + r.count()
+
+ # Find start Row ID
+ if start_row_id is None and start_pos < range_end_pos:
+ offset = start_pos - current_pos
+ start_row_id = r.from_ + offset
+
+ # Find end Row ID
+ if end_pos <= range_end_pos:
+ offset = end_pos - current_pos
+ end_row_id = r.from_ + offset - 1 # -1 because end_pos is
exclusive
+ break
+
+ current_pos = range_end_pos
+
+ if start_row_id is None:
+ return None, None
+ if end_row_id is None:
+ end_row_id = sorted_ranges[-1].to
+
+ return Range(start_row_id, start_row_id), Range(end_row_id, end_row_id)
+
+ @staticmethod
+ def _filter_files_by_row_ranges(partitioned_files: defaultdict,
row_ranges: List[Range]) -> defaultdict:
+ """
+ Filter files by Row ID ranges. Keep files that overlap with the given
ranges.
"""
- plan_start_pos = 0
- plan_end_pos = 0
- entry_end_pos = 0 # end row position of current file in all data
- splits_start_pos = 0
filtered_partitioned_files = defaultdict(list)
- # Iterate through all file entries to find files that overlap with
current shard range
for key, file_entries in partitioned_files.items():
filtered_entries = []
- blob_added = False # If it is true, all blobs corresponding to
this data file are added
+
for entry in file_entries:
- if self._is_blob_file(entry.file.file_name):
- if blob_added:
- filtered_entries.append(entry)
- continue
- blob_added = False
- entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
- entry_end_pos += entry.file.row_count # Update to row
position after current file
-
- # If current file is completely after shard range, stop
iteration
- if entry_begin_pos >= end_pos:
- break
- # If current file is completely before shard range, skip it
- if entry_end_pos <= start_pos:
- continue
- if entry_begin_pos <= start_pos < entry_end_pos:
- splits_start_pos = entry_begin_pos
- plan_start_pos = start_pos - entry_begin_pos
- # If shard end position is within current file, record
relative end position
- if entry_begin_pos < end_pos <= entry_end_pos:
- plan_end_pos = end_pos - splits_start_pos
- # Add files that overlap with shard range to result
- filtered_entries.append(entry)
- blob_added = True
+ first_row_id = entry.file.first_row_id
+ file_range = Range(first_row_id, first_row_id +
entry.file.row_count - 1)
+
+ # Check if file overlaps with any of the row ranges
+ overlaps = False
+ for r in row_ranges:
+ if r.overlaps(file_range):
+ overlaps = True
+ break
+
+ if overlaps:
+ filtered_entries.append(entry)
+
if filtered_entries:
filtered_partitioned_files[key] = filtered_entries
- return filtered_partitioned_files, plan_start_pos, plan_end_pos
+ return filtered_partitioned_files
def _filter_by_shard(self, partitioned_files: defaultdict, sub_task_id:
int, total_tasks: int) -> defaultdict:
list_ranges = []
@@ -413,7 +436,7 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
shard_file_idx_map[self.NEXT_POS_KEY] = next_pos
return shard_file_idx_map
- def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
+ def _wrap_to_indexed_splits(self, splits: List[Split], row_ranges:
List[Range]) -> List[Split]:
"""
Wrap splits with IndexedSplit for row range filtering.
"""
@@ -438,7 +461,7 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
file_ranges = Range.merge_sorted_as_possible(file_ranges)
# Intersect with row_ranges from global index
- expected = Range.and_(file_ranges, self.row_ranges)
+ expected = Range.and_(file_ranges, row_ranges)
if not expected:
# No intersection, skip this split
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index e9b2f39877..2088310aa4 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -25,6 +25,7 @@ from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.deletionvectors import ApplyDeletionVectorReader
from pypaimon.deletionvectors.deletion_vector import DeletionVector
+from pypaimon.globalindex import Range
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
@@ -44,7 +45,7 @@ from pypaimon.read.reader.format_blob_reader import
FormatBlobReader
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
- RowPositionReader)
+ RowPositionReader,
EmptyRecordBatchReader)
from pypaimon.read.reader.iface.record_reader import RecordReader
from pypaimon.read.reader.key_value_unwrap_reader import \
KeyValueUnwrapRecordReader
@@ -593,26 +594,18 @@ class DataEvolutionSplitRead(SplitRead):
def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) ->
Optional[RecordReader]:
"""Create a file reader for a single file."""
def create_record_reader():
- record_reader = self.file_reader_supplier(
+ return self.file_reader_supplier(
file=file,
for_merge_read=False,
read_fields=read_fields,
row_tracking_enabled=True)
- if self.row_ranges is not None:
- record_reader = RowIdFilterRecordBatchReader(record_reader,
file.first_row_id, self.row_ranges)
- return record_reader
-
- shard_file_idx_map = (
- self.split.shard_file_idx_map() if isinstance(self.split,
SlicedSplit) else {}
- )
- if file.file_name in shard_file_idx_map:
- (begin_pos, end_pos) = shard_file_idx_map[file.file_name]
- if (begin_pos, end_pos) == (-1, -1):
- return None
- else:
- return ShardBatchReader(create_record_reader(), begin_pos,
end_pos)
- else:
+ if self.row_ranges is None:
return create_record_reader()
+ file_range = Range(file.first_row_id, file.first_row_id +
file.row_count - 1)
+ row_ranges = Range.and_(self.row_ranges, [file_range])
+ if len(row_ranges) == 0:
+ return EmptyRecordBatchReader()
+ return RowIdFilterRecordBatchReader(create_record_reader(),
file.first_row_id, row_ranges)
def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) ->
List[FieldBunch]:
"""Split files into field bunches."""