XiaoHongbo-Hope commented on code in PR #7032:
URL: https://github.com/apache/paimon/pull/7032#discussion_r2688856885


##########
paimon-python/pypaimon/read/scanner/full_starting_scanner.py:
##########
@@ -753,3 +522,263 @@ def _filter_blob(files: List[DataFileMeta]) -> 
List[DataFileMeta]:
                         result.append(file)
 
         return result
+
+    def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
+        # null to true to be compatible with old version
+        if data_file_meta.delete_row_count is None:
+            return True
+        return data_file_meta.delete_row_count == 0
+
+    def _partial_read(self):
+        return False
+
+    def _filter_by_pos(self, files):
+        pass
+
+    def _compute_split_pos(self, splits: List['Split']) -> None:
+        pass
+
+
+class PartialStartingScanner(FullStartingScanner):
+    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int]):
+        super().__init__(table, predicate, limit)
+        # for shard
+        self.idx_of_this_subtask = None
+        self.number_of_para_subtasks = None
+        self.start_pos_of_this_subtask = None
+        self.end_pos_of_this_subtask = None
+        self.plan_start_end_pos = None
+
+    def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'FullStartingScanner':
+        if idx_of_this_subtask >= number_of_para_subtasks:
+            raise Exception("idx_of_this_subtask must be less than 
number_of_para_subtasks")
+        if self.start_pos_of_this_subtask is not None:
+            raise Exception("with_shard and with_slice cannot be used 
simultaneously")
+        self.idx_of_this_subtask = idx_of_this_subtask
+        self.number_of_para_subtasks = number_of_para_subtasks
+        return self
+
+    def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner':
+        if start_pos >= end_pos:
+            raise Exception("start_pos must be less than end_pos")
+        if self.idx_of_this_subtask is not None:
+            raise Exception("with_slice and with_shard cannot be used 
simultaneously")
+        self.start_pos_of_this_subtask = start_pos
+        self.end_pos_of_this_subtask = end_pos
+        return self
+
+    def _filter_by_pos(self, files):
+        if self.table.is_primary_key_table:
+            return self._primary_key_filter_by_shard(files)
+        elif self.data_evolution:
+            if self.start_pos_of_this_subtask is not None:
+                # shard data range: [plan_start_pos, plan_end_pos)
+                files, self.plan_start_end_pos = \
+                    self._data_evolution_filter_by_slice(files,
+                                                         
self.start_pos_of_this_subtask,
+                                                         
self.end_pos_of_this_subtask)
+            elif self.idx_of_this_subtask is not None:
+                files, self.plan_start_end_pos = 
self._data_evolution_filter_by_shard(files)
+            return files
+        else:
+            if self.start_pos_of_this_subtask is not None:
+                # shard data range: [plan_start_pos, plan_end_pos)
+                files, self.plan_start_end_pos = \
+                    self._append_only_filter_by_slice(files,
+                                                      
self.start_pos_of_this_subtask,
+                                                      
self.end_pos_of_this_subtask)
+            elif self.idx_of_this_subtask is not None:
+                files, self.plan_start_end_pos = 
self._append_only_filter_by_shard(files)
+            return files
+
+    def _compute_split_pos(self, splits: List['Split']) -> None:
+        if self.start_pos_of_this_subtask is not None or 
self.idx_of_this_subtask is not None:
+            # When files are combined into splits, it is necessary to find 
files that needs to be divided for each split
+            self._compute_split_start_end_pos(splits, 
self.plan_start_end_pos[0], self.plan_start_end_pos[1])
+
+    def _append_only_filter_by_slice(self, partitioned_files: defaultdict, 
start_pos: int,
+                                     end_pos: int) -> (defaultdict, int, int):
+        plan_start_pos = 0
+        plan_end_pos = 0
+        entry_end_pos = 0  # end row position of current file in all data
+        splits_start_pos = 0
+        filtered_partitioned_files = defaultdict(list)
+        # Iterate through all file entries to find files that overlap with 
current shard range
+        for key, file_entries in partitioned_files.items():
+            filtered_entries = []
+            for entry in file_entries:
+                entry_begin_pos = entry_end_pos  # Starting row position of 
current file in all data
+                entry_end_pos += entry.file.row_count  # Update to row 
position after current file
+
+                # If current file is completely after shard range, stop 
iteration
+                if entry_begin_pos >= end_pos:
+                    break
+                # If current file is completely before shard range, skip it
+                if entry_end_pos <= start_pos:
+                    continue
+                if entry_begin_pos <= start_pos < entry_end_pos:
+                    splits_start_pos = entry_begin_pos
+                    plan_start_pos = start_pos - entry_begin_pos
+                # If shard end position is within current file, record 
relative end position
+                if entry_begin_pos < end_pos <= entry_end_pos:
+                    plan_end_pos = end_pos - splits_start_pos
+                # Add files that overlap with shard range to result
+                filtered_entries.append(entry)
+            if filtered_entries:
+                filtered_partitioned_files[key] = filtered_entries
+
+        return filtered_partitioned_files, (plan_start_pos, plan_end_pos)
+
+    def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> 
(defaultdict, int, int):
+        """
+        Filter file entries by shard. Only keep the files within the range, 
which means
+        that only the starting and ending files need to be further divided 
subsequently
+        """
+        total_row = 0
+        # Sort by file creation time to ensure consistent sharding
+        for key, file_entries in partitioned_files.items():
+            for entry in file_entries:
+                total_row += entry.file.row_count
+
+        # Calculate number of rows this shard should process using balanced 
distribution
+        # Distribute remainder evenly among first few shards to avoid last 
shard overload
+        base_rows_per_shard = total_row // self.number_of_para_subtasks
+        remainder = total_row % self.number_of_para_subtasks
+
+        # Each of the first 'remainder' shards gets one extra row
+        if self.idx_of_this_subtask < remainder:
+            num_row = base_rows_per_shard + 1
+            start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
+        else:
+            num_row = base_rows_per_shard
+            start_pos = (remainder * (base_rows_per_shard + 1) +
+                         (self.idx_of_this_subtask - remainder) * 
base_rows_per_shard)
+
+        end_pos = start_pos + num_row
+
+        return self._append_only_filter_by_slice(partitioned_files, start_pos, 
end_pos)
+
+    def _data_evolution_filter_by_slice(self, partitioned_files: defaultdict,
+                                        start_pos: int,
+                                        end_pos: int) -> (defaultdict, int, 
int):
+        plan_start_pos = 0
+        plan_end_pos = 0
+        entry_end_pos = 0  # end row position of current file in all data
+        splits_start_pos = 0
+        filtered_partitioned_files = defaultdict(list)
+        # Iterate through all file entries to find files that overlap with 
current shard range
+        for key, file_entries in partitioned_files.items():
+            filtered_entries = []
+            blob_added = False  # If it is true, all blobs corresponding to 
this data file are added
+            for entry in file_entries:
+                if self._is_blob_file(entry.file.file_name):
+                    if blob_added:
+                        filtered_entries.append(entry)
+                    continue
+                blob_added = False
+                entry_begin_pos = entry_end_pos  # Starting row position of 
current file in all data
+                entry_end_pos += entry.file.row_count  # Update to row 
position after current file
+
+                # If current file is completely after shard range, stop 
iteration
+                if entry_begin_pos >= end_pos:
+                    break
+                # If current file is completely before shard range, skip it
+                if entry_end_pos <= start_pos:
+                    continue
+                if entry_begin_pos <= start_pos < entry_end_pos:
+                    splits_start_pos = entry_begin_pos
+                    plan_start_pos = start_pos - entry_begin_pos
+                # If shard end position is within current file, record 
relative end position
+                if entry_begin_pos < end_pos <= entry_end_pos:
+                    plan_end_pos = end_pos - splits_start_pos
+                # Add files that overlap with shard range to result
+                filtered_entries.append(entry)
+                blob_added = True
+            if filtered_entries:
+                filtered_partitioned_files[key] = filtered_entries
+
+        return filtered_partitioned_files, (plan_start_pos, plan_end_pos)
+
+    def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) 
-> (defaultdict, int, int):
+        total_row = 0
+        for key, file_entries in partitioned_files.items():
+            for entry in file_entries:
+                if not self._is_blob_file(entry.file.file_name):
+                    total_row += entry.file.row_count
+
+        # Calculate number of rows this shard should process using balanced 
distribution
+        # Distribute remainder evenly among first few shards to avoid last 
shard overload
+        base_rows_per_shard = total_row // self.number_of_para_subtasks
+        remainder = total_row % self.number_of_para_subtasks
+
+        # Each of the first 'remainder' shards gets one extra row
+        if self.idx_of_this_subtask < remainder:
+            num_row = base_rows_per_shard + 1
+            start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
+        else:
+            num_row = base_rows_per_shard
+            start_pos = (remainder * (base_rows_per_shard + 1) +
+                         (self.idx_of_this_subtask - remainder) * 
base_rows_per_shard)
+
+        end_pos = start_pos + num_row
+        return self._data_evolution_filter_by_slice(partitioned_files, 
start_pos, end_pos)
+
+    def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) 
-> List[ManifestEntry]:
+        filtered_entries = []
+        for entry in file_entries:
+            if entry.bucket % self.number_of_para_subtasks == 
self.idx_of_this_subtask:
+                filtered_entries.append(entry)
+        return filtered_entries
+
+    def _compute_split_start_end_pos(self, splits: List[Split], 
plan_start_pos, plan_end_pos):
+        """
+        Find files that needs to be divided for each split
+        :param splits: splits
+        :param plan_start_pos: plan begin row in all splits data
+        :param plan_end_pos: plan end row in all splits data
+        """
+        file_end_pos = 0  # end row position of current file in all splits data
+
+        for split in splits:
+            cur_split_end_pos = file_end_pos
+            # Compute split_file_idx_map for data files
+            file_end_pos = self._compute_split_file_idx_map(plan_start_pos, 
plan_end_pos,
+                                                            split, 
cur_split_end_pos, False)
+            # Compute split_file_idx_map for blob files
+            if self.data_evolution:
+                self._compute_split_file_idx_map(plan_start_pos, plan_end_pos,
+                                                 split, cur_split_end_pos, 
True)
+
+    def _compute_split_file_idx_map(self, plan_start_pos, plan_end_pos, split: 
Split,
+                                    file_end_pos: int, is_blob: bool = False):
+        """
+        Traverse all the files in current split, find the starting shard and 
ending shard files,
+        and add them to shard_file_idx_map;
+        - for data file, only two data files will be divided in all splits.
+        - for blob file, perhaps there will be some unnecessary files in 
addition to two files(start and end).
+          Add them to shard_file_idx_map as well, because they need to be 
removed later.
+        """
+        row_cnt = 0
+        for file in split.files:
+            if not is_blob and self._is_blob_file(file.file_name):
+                continue
+            if is_blob and not self._is_blob_file(file.file_name):
+                continue
+            row_cnt += file.row_count
+            file_begin_pos = file_end_pos  # Starting row position of current 
file in all data
+            file_end_pos += file.row_count  # Update to row position after 
current file
+            if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos:
+                split.shard_file_idx_map[file.file_name] = (
+                    plan_start_pos - file_begin_pos, plan_end_pos - 
file_begin_pos)
+            # If shard start position is within current file, record actual 
start position and relative offset
+            elif file_begin_pos < plan_start_pos < file_end_pos:
+                split.shard_file_idx_map[file.file_name] = (plan_start_pos - 
file_begin_pos, file.row_count)
+            # If shard end position is within current file, record relative 
end position
+            elif file_begin_pos < plan_end_pos < file_end_pos:
+                split.shard_file_idx_map[file.file_name] = (0, plan_end_pos - 
file_begin_pos)
+            elif file_end_pos <= plan_start_pos or file_begin_pos >= 
plan_end_pos:
+                split.shard_file_idx_map[file.file_name] = (-1, -1)
+        return file_end_pos

Review Comment:
   define a constant for (-1, -1), which is important for other developers to 
understand it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to