This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8bbda0b467 [python] Rename to TableScan.withSlice to specific
start_pos and end_pos
8bbda0b467 is described below
commit 8bbda0b467c1925122d4b477e3577bbb0791c4d7
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jan 5 10:38:18 2026 +0800
[python] Rename to TableScan.withSlice to specific start_pos and end_pos
---
.../pypaimon/read/reader/shard_batch_reader.py | 24 +--
.../pypaimon/read/scanner/full_starting_scanner.py | 181 +++++++++++----------
paimon-python/pypaimon/read/split_read.py | 12 +-
paimon-python/pypaimon/read/table_scan.py | 8 +-
paimon-python/pypaimon/tests/blob_table_test.py | 8 +-
5 files changed, 115 insertions(+), 118 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/shard_batch_reader.py
b/paimon-python/pypaimon/read/reader/shard_batch_reader.py
index 0baf63e534..cfe8193df5 100644
--- a/paimon-python/pypaimon/read/reader/shard_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/shard_batch_reader.py
@@ -25,17 +25,17 @@ class ShardBatchReader(RecordBatchReader):
"""
A reader that reads a subset of rows from a data file
"""
- def __init__(self, reader, start_row, end_row):
+ def __init__(self, reader, start_pos, end_pos):
self.reader = reader
- self.start_row = start_row
- self.end_row = end_row
- self.current_row = 0
+ self.start_pos = start_pos
+ self.end_pos = end_pos
+ self.current_pos = 0
def read_arrow_batch(self) -> Optional[RecordBatch]:
# Check if reader is FormatBlobReader (blob type)
if isinstance(self.reader.format_reader, FormatBlobReader):
# For blob reader, pass begin_idx and end_idx parameters
- return self.reader.read_arrow_batch(start_idx=self.start_row,
end_idx=self.end_row)
+ return self.reader.read_arrow_batch(start_idx=self.start_pos,
end_idx=self.end_pos)
else:
# For non-blob reader (DataFileBatchReader), use standard
read_arrow_batch
batch = self.reader.read_arrow_batch()
@@ -44,16 +44,16 @@ class ShardBatchReader(RecordBatchReader):
return None
# Apply row range filtering for non-blob readers
- batch_begin = self.current_row
- self.current_row += batch.num_rows
+ batch_begin = self.current_pos
+ self.current_pos += batch.num_rows
# Check if batch is within the desired range
- if self.start_row <= batch_begin < self.current_row <=
self.end_row: # batch is within the desired range
+ if self.start_pos <= batch_begin < self.current_pos <=
self.end_pos: # batch is within the desired range
return batch
- elif batch_begin < self.start_row < self.current_row: # batch
starts before the desired range
- return batch.slice(self.start_row - batch_begin, self.end_row
- self.start_row)
- elif batch_begin < self.end_row < self.current_row: # batch ends
after the desired range
- return batch.slice(0, self.end_row - batch_begin)
+ elif batch_begin < self.start_pos < self.current_pos: # batch
starts before the desired range
+ return batch.slice(self.start_pos - batch_begin, self.end_pos
- self.start_pos)
+ elif batch_begin < self.end_pos < self.current_pos: # batch ends
after the desired range
+ return batch.slice(0, self.end_pos - batch_begin)
else: # batch is outside the desired range
return self.read_arrow_batch()
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 3698f9de6f..3765baffa6 100755
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -63,8 +63,8 @@ class FullStartingScanner(StartingScanner):
self.idx_of_this_subtask = None
self.number_of_para_subtasks = None
- self.start_row_of_this_subtask = None
- self.end_row_of_this_subtask = None
+ self.start_pos_of_this_subtask = None
+ self.end_pos_of_this_subtask = None
self.only_read_real_buckets = True if options.bucket() ==
BucketMode.POSTPONE_BUCKET.value else False
self.data_evolution = options.data_evolution_enabled()
@@ -123,54 +123,55 @@ class FullStartingScanner(StartingScanner):
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'FullStartingScanner':
if idx_of_this_subtask >= number_of_para_subtasks:
raise Exception("idx_of_this_subtask must be less than
number_of_para_subtasks")
- if self.start_row_of_this_subtask is not None:
- raise Exception("with_shard and with_row_range cannot be used
simultaneously")
+ if self.start_pos_of_this_subtask is not None:
+ raise Exception("with_shard and with_slice cannot be used
simultaneously")
self.idx_of_this_subtask = idx_of_this_subtask
self.number_of_para_subtasks = number_of_para_subtasks
return self
- def with_row_range(self, start_row, end_row) -> 'FullStartingScanner':
- if start_row >= end_row:
- raise Exception("start_row must be less than end_row")
+ def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner':
+ if start_pos >= end_pos:
+ raise Exception("start_pos must be less than end_pos")
if self.idx_of_this_subtask is not None:
- raise Exception("with_row_range and with_shard cannot be used
simultaneously")
- self.start_row_of_this_subtask = start_row
- self.end_row_of_this_subtask = end_row
+ raise Exception("with_slice and with_shard cannot be used
simultaneously")
+ self.start_pos_of_this_subtask = start_pos
+ self.end_pos_of_this_subtask = end_pos
return self
- def _append_only_filter_by_row_range(self, partitioned_files: defaultdict,
- start_row: int,
- end_row: int) -> (defaultdict, int,
int):
- plan_start_row = 0
- plan_end_row = 0
- entry_end_row = 0 # end row position of current file in all data
- splits_start_row = 0
+ @staticmethod
+ def _append_only_filter_by_slice(partitioned_files: defaultdict,
+ start_pos: int,
+ end_pos: int) -> (defaultdict, int, int):
+ plan_start_pos = 0
+ plan_end_pos = 0
+ entry_end_pos = 0 # end row position of current file in all data
+ splits_start_pos = 0
filtered_partitioned_files = defaultdict(list)
# Iterate through all file entries to find files that overlap with
current shard range
for key, file_entries in partitioned_files.items():
filtered_entries = []
for entry in file_entries:
- entry_begin_row = entry_end_row # Starting row position of
current file in all data
- entry_end_row += entry.file.row_count # Update to row
position after current file
+ entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
+ entry_end_pos += entry.file.row_count # Update to row
position after current file
# If current file is completely after shard range, stop
iteration
- if entry_begin_row >= end_row:
+ if entry_begin_pos >= end_pos:
break
# If current file is completely before shard range, skip it
- if entry_end_row <= start_row:
+ if entry_end_pos <= start_pos:
continue
- if entry_begin_row <= start_row < entry_end_row:
- splits_start_row = entry_begin_row
- plan_start_row = start_row - entry_begin_row
+ if entry_begin_pos <= start_pos < entry_end_pos:
+ splits_start_pos = entry_begin_pos
+ plan_start_pos = start_pos - entry_begin_pos
# If shard end position is within current file, record
relative end position
- if entry_begin_row < end_row <= entry_end_row:
- plan_end_row = end_row - splits_start_row
+ if entry_begin_pos < end_pos <= entry_end_pos:
+ plan_end_pos = end_pos - splits_start_pos
# Add files that overlap with shard range to result
filtered_entries.append(entry)
if filtered_entries:
filtered_partitioned_files[key] = filtered_entries
- return filtered_partitioned_files, plan_start_row, plan_end_row
+ return filtered_partitioned_files, plan_start_pos, plan_end_pos
def _append_only_filter_by_shard(self, partitioned_files: defaultdict) ->
(defaultdict, int, int):
"""
@@ -191,23 +192,23 @@ class FullStartingScanner(StartingScanner):
# Each of the first 'remainder' shards gets one extra row
if self.idx_of_this_subtask < remainder:
num_row = base_rows_per_shard + 1
- start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1)
+ start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
else:
num_row = base_rows_per_shard
- start_row = (remainder * (base_rows_per_shard + 1) +
+ start_pos = (remainder * (base_rows_per_shard + 1) +
(self.idx_of_this_subtask - remainder) *
base_rows_per_shard)
- end_row = start_row + num_row
+ end_pos = start_pos + num_row
- return self._append_only_filter_by_row_range(partitioned_files,
start_row, end_row)
+ return self._append_only_filter_by_slice(partitioned_files, start_pos,
end_pos)
def _data_evolution_filter_by_row_range(self, partitioned_files:
defaultdict,
- start_row: int,
- end_row: int) -> (defaultdict,
int, int):
- plan_start_row = 0
- plan_end_row = 0
- entry_end_row = 0 # end row position of current file in all data
- splits_start_row = 0
+ start_pos: int,
+ end_pos: int) -> (defaultdict,
int, int):
+ plan_start_pos = 0
+ plan_end_pos = 0
+ entry_end_pos = 0 # end row position of current file in all data
+ splits_start_pos = 0
filtered_partitioned_files = defaultdict(list)
# Iterate through all file entries to find files that overlap with
current shard range
for key, file_entries in partitioned_files.items():
@@ -219,28 +220,28 @@ class FullStartingScanner(StartingScanner):
filtered_entries.append(entry)
continue
blob_added = False
- entry_begin_row = entry_end_row # Starting row position of
current file in all data
- entry_end_row += entry.file.row_count # Update to row
position after current file
+ entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
+ entry_end_pos += entry.file.row_count # Update to row
position after current file
# If current file is completely after shard range, stop
iteration
- if entry_begin_row >= end_row:
+ if entry_begin_pos >= end_pos:
break
# If current file is completely before shard range, skip it
- if entry_end_row <= start_row:
+ if entry_end_pos <= start_pos:
continue
- if entry_begin_row <= start_row < entry_end_row:
- splits_start_row = entry_begin_row
- plan_start_row = start_row - entry_begin_row
+ if entry_begin_pos <= start_pos < entry_end_pos:
+ splits_start_pos = entry_begin_pos
+ plan_start_pos = start_pos - entry_begin_pos
# If shard end position is within current file, record
relative end position
- if entry_begin_row < end_row <= entry_end_row:
- plan_end_row = end_row - splits_start_row
+ if entry_begin_pos < end_pos <= entry_end_pos:
+ plan_end_pos = end_pos - splits_start_pos
# Add files that overlap with shard range to result
filtered_entries.append(entry)
blob_added = True
if filtered_entries:
filtered_partitioned_files[key] = filtered_entries
- return filtered_partitioned_files, plan_start_row, plan_end_row
+ return filtered_partitioned_files, plan_start_pos, plan_end_pos
def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict)
-> (defaultdict, int, int):
total_row = 0
@@ -257,36 +258,36 @@ class FullStartingScanner(StartingScanner):
# Each of the first 'remainder' shards gets one extra row
if self.idx_of_this_subtask < remainder:
num_row = base_rows_per_shard + 1
- start_row = self.idx_of_this_subtask * (base_rows_per_shard + 1)
+ start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
else:
num_row = base_rows_per_shard
- start_row = (remainder * (base_rows_per_shard + 1) +
+ start_pos = (remainder * (base_rows_per_shard + 1) +
(self.idx_of_this_subtask - remainder) *
base_rows_per_shard)
- end_row = start_row + num_row
- return self._data_evolution_filter_by_row_range(partitioned_files,
start_row, end_row)
+ end_pos = start_pos + num_row
+ return self._data_evolution_filter_by_row_range(partitioned_files,
start_pos, end_pos)
- def _compute_split_start_end_row(self, splits: List[Split],
plan_start_row, plan_end_row):
+ def _compute_split_start_end_pos(self, splits: List[Split],
plan_start_pos, plan_end_pos):
"""
Find files that needs to be divided for each split
:param splits: splits
- :param plan_start_row: plan begin row in all splits data
- :param plan_end_row: plan end row in all splits data
+ :param plan_start_pos: plan begin row in all splits data
+ :param plan_end_pos: plan end row in all splits data
"""
- file_end_row = 0 # end row position of current file in all splits data
+ file_end_pos = 0 # end row position of current file in all splits data
for split in splits:
- cur_split_end_row = file_end_row
+ cur_split_end_pos = file_end_pos
# Compute split_file_idx_map for data files
- file_end_row = self._compute_split_file_idx_map(plan_start_row,
plan_end_row,
- split,
cur_split_end_row, False)
+ file_end_pos = self._compute_split_file_idx_map(plan_start_pos,
plan_end_pos,
+ split,
cur_split_end_pos, False)
# Compute split_file_idx_map for blob files
if self.data_evolution:
- self._compute_split_file_idx_map(plan_start_row, plan_end_row,
- split, cur_split_end_row,
True)
+ self._compute_split_file_idx_map(plan_start_pos, plan_end_pos,
+ split, cur_split_end_pos,
True)
- def _compute_split_file_idx_map(self, plan_start_row, plan_end_row, split:
Split,
- file_end_row: int, is_blob: bool = False):
+ def _compute_split_file_idx_map(self, plan_start_pos, plan_end_pos, split:
Split,
+ file_end_pos: int, is_blob: bool = False):
"""
Traverse all the files in current split, find the starting shard and
ending shard files,
and add them to shard_file_idx_map;
@@ -301,20 +302,20 @@ class FullStartingScanner(StartingScanner):
if is_blob and not self._is_blob_file(file.file_name):
continue
row_cnt += file.row_count
- file_begin_row = file_end_row # Starting row position of current
file in all data
- file_end_row += file.row_count # Update to row position after
current file
- if file_begin_row <= plan_start_row < plan_end_row <= file_end_row:
+ file_begin_pos = file_end_pos # Starting row position of current
file in all data
+ file_end_pos += file.row_count # Update to row position after
current file
+ if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos:
split.shard_file_idx_map[file.file_name] = (
- plan_start_row - file_begin_row, plan_end_row -
file_begin_row)
+ plan_start_pos - file_begin_pos, plan_end_pos -
file_begin_pos)
# If shard start position is within current file, record actual
start position and relative offset
- elif file_begin_row < plan_start_row < file_end_row:
- split.shard_file_idx_map[file.file_name] = (plan_start_row -
file_begin_row, file.row_count)
+ elif file_begin_pos < plan_start_pos < file_end_pos:
+ split.shard_file_idx_map[file.file_name] = (plan_start_pos -
file_begin_pos, file.row_count)
# If shard end position is within current file, record relative
end position
- elif file_begin_row < plan_end_row < file_end_row:
- split.shard_file_idx_map[file.file_name] = (0, plan_end_row -
file_begin_row)
- elif file_end_row <= plan_start_row or file_begin_row >=
plan_end_row:
+ elif file_begin_pos < plan_end_pos < file_end_pos:
+ split.shard_file_idx_map[file.file_name] = (0, plan_end_pos -
file_begin_pos)
+ elif file_end_pos <= plan_start_pos or file_begin_pos >=
plan_end_pos:
split.shard_file_idx_map[file.file_name] = (-1, -1)
- return file_end_row
+ return file_end_pos
def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry])
-> List[ManifestEntry]:
filtered_entries = []
@@ -473,14 +474,14 @@ class FullStartingScanner(StartingScanner):
for entry in file_entries:
partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
- if self.start_row_of_this_subtask is not None:
- # shard data range: [plan_start_row, plan_end_row)
- partitioned_files, plan_start_row, plan_end_row = \
- self._append_only_filter_by_row_range(partitioned_files,
-
self.start_row_of_this_subtask,
-
self.end_row_of_this_subtask)
+ if self.start_pos_of_this_subtask is not None:
+ # shard data range: [plan_start_pos, plan_end_pos)
+ partitioned_files, plan_start_pos, plan_end_pos = \
+ self._append_only_filter_by_slice(partitioned_files,
+
self.start_pos_of_this_subtask,
+ self.end_pos_of_this_subtask)
elif self.idx_of_this_subtask is not None:
- partitioned_files, plan_start_row, plan_end_row =
self._append_only_filter_by_shard(partitioned_files)
+ partitioned_files, plan_start_pos, plan_end_pos =
self._append_only_filter_by_shard(partitioned_files)
def weight_func(f: DataFileMeta) -> int:
return max(f.file_size, self.open_file_cost)
@@ -495,9 +496,9 @@ class FullStartingScanner(StartingScanner):
packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(data_files, weight_func,
self.target_split_size)
splits += self._build_split_from_pack(packed_files, file_entries,
False, deletion_files_map)
- if self.start_row_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
+ if self.start_pos_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
# When files are combined into splits, it is necessary to find
files that needs to be divided for each split
- self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
+ self._compute_split_start_end_pos(splits, plan_start_pos,
plan_end_pos)
return splits
def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
@@ -583,15 +584,15 @@ class FullStartingScanner(StartingScanner):
for entry in sorted_entries:
partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
- if self.start_row_of_this_subtask is not None:
- # shard data range: [plan_start_row, plan_end_row)
- partitioned_files, plan_start_row, plan_end_row = \
+ if self.start_pos_of_this_subtask is not None:
+ # shard data range: [plan_start_pos, plan_end_pos)
+ partitioned_files, plan_start_pos, plan_end_pos = \
self._data_evolution_filter_by_row_range(partitioned_files,
-
self.start_row_of_this_subtask,
-
self.end_row_of_this_subtask)
+
self.start_pos_of_this_subtask,
+
self.end_pos_of_this_subtask)
elif self.idx_of_this_subtask is not None:
- # shard data range: [plan_start_row, plan_end_row)
- partitioned_files, plan_start_row, plan_end_row =
self._data_evolution_filter_by_shard(partitioned_files)
+ # shard data range: [plan_start_pos, plan_end_pos)
+ partitioned_files, plan_start_pos, plan_end_pos =
self._data_evolution_filter_by_shard(partitioned_files)
def weight_func(file_list: List[DataFileMeta]) -> int:
return max(sum(f.file_size for f in file_list),
self.open_file_cost)
@@ -618,8 +619,8 @@ class FullStartingScanner(StartingScanner):
splits += self._build_split_from_pack(flatten_packed_files,
sorted_entries, False, deletion_files_map)
- if self.start_row_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
- self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
+ if self.start_pos_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
+ self._compute_split_start_end_pos(splits, plan_start_pos,
plan_end_pos)
return splits
def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 8a374f447d..5df6ccb96e 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -334,15 +334,15 @@ class RawFileSplitRead(SplitRead):
read_fields = self._get_final_read_data_fields()
# If the current file needs to be further divided for reading, use
ShardBatchReader
if file.file_name in self.split.shard_file_idx_map:
- (start_row, end_row) =
self.split.shard_file_idx_map[file.file_name]
- if (start_row, end_row) == (-1, -1):
+ (start_pos, end_pos) =
self.split.shard_file_idx_map[file.file_name]
+ if (start_pos, end_pos) == (-1, -1):
return None
else:
file_batch_reader = ShardBatchReader(self.file_reader_supplier(
file=file,
for_merge_read=False,
read_fields=read_fields,
- row_tracking_enabled=True), start_row, end_row)
+ row_tracking_enabled=True), start_pos, end_pos)
else:
file_batch_reader = self.file_reader_supplier(
file=file,
@@ -568,15 +568,15 @@ class DataEvolutionSplitRead(SplitRead):
"""Create a file reader for a single file."""
# If the current file needs to be further divided for reading, use
ShardBatchReader
if file.file_name in self.split.shard_file_idx_map:
- (begin_row, end_row) =
self.split.shard_file_idx_map[file.file_name]
- if (begin_row, end_row) == (-1, -1):
+ (begin_pos, end_pos) =
self.split.shard_file_idx_map[file.file_name]
+ if (begin_pos, end_pos) == (-1, -1):
return None
else:
return ShardBatchReader(self.file_reader_supplier(
file=file,
for_merge_read=False,
read_fields=read_fields,
- row_tracking_enabled=True), begin_row, end_row)
+ row_tracking_enabled=True), begin_pos, end_pos)
else:
return self.file_reader_supplier(
file=file,
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 7f603ed71a..7aac628659 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -72,10 +72,6 @@ class TableScan:
self.starting_scanner.with_shard(idx_of_this_subtask,
number_of_para_subtasks)
return self
- def with_row_range(self, start_row, end_row) -> 'TableScan':
- """
- Filter file entries by row range. The row_id corresponds to the row
position of the
- file in all file entries in table scan's partitioned_files.
- """
- self.starting_scanner.with_row_range(start_row, end_row)
+ def with_slice(self, start_pos, end_pos) -> 'TableScan':
+ self.starting_scanner.with_slice(start_pos, end_pos)
return self
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index c17cf2c9d4..de59d0398e 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -2373,7 +2373,7 @@ class DataBlobWriterTest(unittest.TestCase):
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
self.assertEqual(actual, expected)
- def test_data_blob_writer_with_row_range(self):
+ def test_data_blob_writer_with_slice(self):
"""Test DataBlobWriter with mixed data types in blob column."""
# Create schema with blob column
@@ -2390,8 +2390,8 @@ class DataBlobWriterTest(unittest.TestCase):
'data-evolution.enabled': 'true'
}
)
- self.catalog.create_table('test_db.with_row_range_test', schema, False)
- table = self.catalog.get_table('test_db.with_row_range_test')
+ self.catalog.create_table('test_db.with_slice_test', schema, False)
+ table = self.catalog.get_table('test_db.with_slice_test')
# Use proper table API to create writer
write_builder = table.new_batch_write_builder()
@@ -2425,7 +2425,7 @@ class DataBlobWriterTest(unittest.TestCase):
# Read data back using table API
read_builder = table.new_read_builder()
- table_scan = read_builder.new_scan().with_row_range(2, 4)
+ table_scan = read_builder.new_scan().with_slice(2, 4)
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
result = table_read.to_arrow(splits)