XiaoHongbo-Hope commented on code in PR #7032:
URL: https://github.com/apache/paimon/pull/7032#discussion_r2688886891
##########
paimon-python/pypaimon/read/scanner/full_starting_scanner.py:
##########
@@ -120,209 +115,138 @@ def read_manifest_entries(self, manifest_files:
List[ManifestFileMeta]) -> List[
self._filter_manifest_entry,
max_workers=max_workers)
- def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'FullStartingScanner':
- if idx_of_this_subtask >= number_of_para_subtasks:
- raise Exception("idx_of_this_subtask must be less than
number_of_para_subtasks")
- if self.start_pos_of_this_subtask is not None:
- raise Exception("with_shard and with_slice cannot be used
simultaneously")
- self.idx_of_this_subtask = idx_of_this_subtask
- self.number_of_para_subtasks = number_of_para_subtasks
- return self
+ def _create_append_only_splits(
+ self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
+ partitioned_files = defaultdict(list)
+ for entry in file_entries:
+ partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
- def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner':
- if start_pos >= end_pos:
- raise Exception("start_pos must be less than end_pos")
- if self.idx_of_this_subtask is not None:
- raise Exception("with_slice and with_shard cannot be used
simultaneously")
- self.start_pos_of_this_subtask = start_pos
- self.end_pos_of_this_subtask = end_pos
- return self
+ if self._partial_read():
+ partitioned_files = self._filter_by_pos(partitioned_files)
- @staticmethod
- def _append_only_filter_by_slice(partitioned_files: defaultdict,
- start_pos: int,
- end_pos: int) -> (defaultdict, int, int):
- plan_start_pos = 0
- plan_end_pos = 0
- entry_end_pos = 0 # end row position of current file in all data
- splits_start_pos = 0
- filtered_partitioned_files = defaultdict(list)
- # Iterate through all file entries to find files that overlap with
current shard range
+ def weight_func(f: DataFileMeta) -> int:
+ return max(f.file_size, self.open_file_cost)
+
+ splits = []
for key, file_entries in partitioned_files.items():
- filtered_entries = []
- for entry in file_entries:
- entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
- entry_end_pos += entry.file.row_count # Update to row
position after current file
+ if not file_entries:
+ return []
- # If current file is completely after shard range, stop
iteration
- if entry_begin_pos >= end_pos:
- break
- # If current file is completely before shard range, skip it
- if entry_end_pos <= start_pos:
- continue
- if entry_begin_pos <= start_pos < entry_end_pos:
- splits_start_pos = entry_begin_pos
- plan_start_pos = start_pos - entry_begin_pos
- # If shard end position is within current file, record
relative end position
- if entry_begin_pos < end_pos <= entry_end_pos:
- plan_end_pos = end_pos - splits_start_pos
- # Add files that overlap with shard range to result
- filtered_entries.append(entry)
- if filtered_entries:
- filtered_partitioned_files[key] = filtered_entries
+ data_files: List[DataFileMeta] = [e.file for e in file_entries]
- return filtered_partitioned_files, plan_start_pos, plan_end_pos
+ packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(data_files, weight_func,
+
self.target_split_size)
+ splits += self._build_split_from_pack(packed_files, file_entries,
False, deletion_files_map)
+ if self._partial_read():
+ self._compute_split_pos(splits)
+ return splits
- def _append_only_filter_by_shard(self, partitioned_files: defaultdict) ->
(defaultdict, int, int):
- """
- Filter file entries by shard. Only keep the files within the range,
which means
- that only the starting and ending files need to be further divided
subsequently
- """
- total_row = 0
- # Sort by file creation time to ensure consistent sharding
- for key, file_entries in partitioned_files.items():
- for entry in file_entries:
- total_row += entry.file.row_count
+ def _create_primary_key_splits(
+ self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
+ if self._partial_read():
+ file_entries = self._filter_by_pos(file_entries)
- # Calculate number of rows this shard should process using balanced
distribution
- # Distribute remainder evenly among first few shards to avoid last
shard overload
- base_rows_per_shard = total_row // self.number_of_para_subtasks
- remainder = total_row % self.number_of_para_subtasks
+ partitioned_files = defaultdict(list)
+ for entry in file_entries:
+ partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
- # Each of the first 'remainder' shards gets one extra row
- if self.idx_of_this_subtask < remainder:
- num_row = base_rows_per_shard + 1
- start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
- else:
- num_row = base_rows_per_shard
- start_pos = (remainder * (base_rows_per_shard + 1) +
- (self.idx_of_this_subtask - remainder) *
base_rows_per_shard)
+ def single_weight_func(f: DataFileMeta) -> int:
+ return max(f.file_size, self.open_file_cost)
- end_pos = start_pos + num_row
+ def weight_func(fl: List[DataFileMeta]) -> int:
+ return max(sum(f.file_size for f in fl), self.open_file_cost)
- return self._append_only_filter_by_slice(partitioned_files, start_pos,
end_pos)
+ merge_engine = self.table.options.merge_engine()
+ merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW
- def _data_evolution_filter_by_row_range(self, partitioned_files:
defaultdict,
- start_pos: int,
- end_pos: int) -> (defaultdict,
int, int):
- plan_start_pos = 0
- plan_end_pos = 0
- entry_end_pos = 0 # end row position of current file in all data
- splits_start_pos = 0
- filtered_partitioned_files = defaultdict(list)
- # Iterate through all file entries to find files that overlap with
current shard range
+ splits = []
for key, file_entries in partitioned_files.items():
- filtered_entries = []
- blob_added = False # If it is true, all blobs corresponding to
this data file are added
- for entry in file_entries:
- if self._is_blob_file(entry.file.file_name):
- if blob_added:
- filtered_entries.append(entry)
- continue
- blob_added = False
- entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
- entry_end_pos += entry.file.row_count # Update to row
position after current file
+ if not file_entries:
+ continue
- # If current file is completely after shard range, stop
iteration
- if entry_begin_pos >= end_pos:
- break
- # If current file is completely before shard range, skip it
- if entry_end_pos <= start_pos:
- continue
- if entry_begin_pos <= start_pos < entry_end_pos:
- splits_start_pos = entry_begin_pos
- plan_start_pos = start_pos - entry_begin_pos
- # If shard end position is within current file, record
relative end position
- if entry_begin_pos < end_pos <= entry_end_pos:
- plan_end_pos = end_pos - splits_start_pos
- # Add files that overlap with shard range to result
- filtered_entries.append(entry)
- blob_added = True
- if filtered_entries:
- filtered_partitioned_files[key] = filtered_entries
+ data_files: List[DataFileMeta] = [e.file for e in file_entries]
- return filtered_partitioned_files, plan_start_pos, plan_end_pos
+ raw_convertible = all(
+ f.level != 0 and self._without_delete_row(f)
+ for f in data_files
+ )
- def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict)
-> (defaultdict, int, int):
- total_row = 0
- for key, file_entries in partitioned_files.items():
- for entry in file_entries:
- if not self._is_blob_file(entry.file.file_name):
- total_row += entry.file.row_count
+ levels = {f.level for f in data_files}
+ one_level = len(levels) == 1
- # Calculate number of rows this shard should process using balanced
distribution
- # Distribute remainder evenly among first few shards to avoid last
shard overload
- base_rows_per_shard = total_row // self.number_of_para_subtasks
- remainder = total_row % self.number_of_para_subtasks
+ use_optimized_path = raw_convertible and (
+ self.deletion_vectors_enabled or merge_engine_first_row or
one_level)
+ if use_optimized_path:
+ packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(
+ data_files, single_weight_func, self.target_split_size
+ )
+ splits += self._build_split_from_pack(
+ packed_files, file_entries, True, deletion_files_map,
+ use_optimized_path)
+ else:
+ partition_sort_runs: List[List[SortedRun]] =
IntervalPartition(data_files).partition()
+ sections: List[List[DataFileMeta]] = [
+ [file for s in sl for file in s.files]
+ for sl in partition_sort_runs
+ ]
- # Each of the first 'remainder' shards gets one extra row
- if self.idx_of_this_subtask < remainder:
- num_row = base_rows_per_shard + 1
- start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
- else:
- num_row = base_rows_per_shard
- start_pos = (remainder * (base_rows_per_shard + 1) +
- (self.idx_of_this_subtask - remainder) *
base_rows_per_shard)
+ packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(sections, weight_func,
+
self.target_split_size)
- end_pos = start_pos + num_row
- return self._data_evolution_filter_by_row_range(partitioned_files,
start_pos, end_pos)
+ flatten_packed_files: List[List[DataFileMeta]] = [
+ [file for sub_pack in pack for file in sub_pack]
+ for pack in packed_files
+ ]
+ splits += self._build_split_from_pack(
+ flatten_packed_files, file_entries, True,
+ deletion_files_map, False)
+ return splits
- def _compute_split_start_end_pos(self, splits: List[Split],
plan_start_pos, plan_end_pos):
- """
- Find files that needs to be divided for each split
- :param splits: splits
- :param plan_start_pos: plan begin row in all splits data
- :param plan_end_pos: plan end row in all splits data
- """
- file_end_pos = 0 # end row position of current file in all splits data
+ def _create_data_evolution_splits(
+ self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
+ def sort_key(manifest_entry: ManifestEntry) -> tuple:
+ first_row_id = manifest_entry.file.first_row_id if
manifest_entry.file.first_row_id is not None else float(
+ '-inf')
+ is_blob = 1 if self._is_blob_file(manifest_entry.file.file_name)
else 0
+ # For files with same firstRowId, sort by maxSequenceNumber in
descending order
+ # (larger sequence number means more recent data)
+ max_seq = manifest_entry.file.max_sequence_number
+ return first_row_id, is_blob, -max_seq
- for split in splits:
- cur_split_end_pos = file_end_pos
- # Compute split_file_idx_map for data files
- file_end_pos = self._compute_split_file_idx_map(plan_start_pos,
plan_end_pos,
- split,
cur_split_end_pos, False)
- # Compute split_file_idx_map for blob files
- if self.data_evolution:
- self._compute_split_file_idx_map(plan_start_pos, plan_end_pos,
- split, cur_split_end_pos,
True)
+ partitioned_files = defaultdict(list)
+ for entry in file_entries:
+ partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
- def _compute_split_file_idx_map(self, plan_start_pos, plan_end_pos, split:
Split,
- file_end_pos: int, is_blob: bool = False):
- """
- Traverse all the files in current split, find the starting shard and
ending shard files,
- and add them to shard_file_idx_map;
- - for data file, only two data files will be divided in all splits.
- - for blob file, perhaps there will be some unnecessary files in
addition to two files(start and end).
- Add them to shard_file_idx_map as well, because they need to be
removed later.
- """
- row_cnt = 0
- for file in split.files:
- if not is_blob and self._is_blob_file(file.file_name):
- continue
- if is_blob and not self._is_blob_file(file.file_name):
+ if self._partial_read():
+ partitioned_files = self._filter_by_pos(partitioned_files)
+
Review Comment:
do we need sort before call _filter_by_pos?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]