XiaoHongbo-Hope commented on code in PR #7032:
URL: https://github.com/apache/paimon/pull/7032#discussion_r2688860910


##########
paimon-python/pypaimon/read/scanner/full_starting_scanner.py:
##########
@@ -753,3 +522,263 @@ def _filter_blob(files: List[DataFileMeta]) -> 
List[DataFileMeta]:
                         result.append(file)
 
         return result
+
+    def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
+        # null to true to be compatible with old version
+        if data_file_meta.delete_row_count is None:
+            return True
+        return data_file_meta.delete_row_count == 0
+
+    def _partial_read(self):
+        return False
+
+    def _filter_by_pos(self, files):
+        pass
+
+    def _compute_split_pos(self, splits: List['Split']) -> None:
+        pass
+
+
+class PartialStartingScanner(FullStartingScanner):
+    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int]):
+        super().__init__(table, predicate, limit)
+        # for shard
+        self.idx_of_this_subtask = None
+        self.number_of_para_subtasks = None
+        self.start_pos_of_this_subtask = None
+        self.end_pos_of_this_subtask = None
+        self.plan_start_end_pos = None
+
+    def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'FullStartingScanner':
+        if idx_of_this_subtask >= number_of_para_subtasks:
+            raise Exception("idx_of_this_subtask must be less than 
number_of_para_subtasks")
+        if self.start_pos_of_this_subtask is not None:
+            raise Exception("with_shard and with_slice cannot be used 
simultaneously")
+        self.idx_of_this_subtask = idx_of_this_subtask
+        self.number_of_para_subtasks = number_of_para_subtasks
+        return self
+
+    def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner':
+        if start_pos >= end_pos:
+            raise Exception("start_pos must be less than end_pos")
+        if self.idx_of_this_subtask is not None:
+            raise Exception("with_slice and with_shard cannot be used 
simultaneously")
+        self.start_pos_of_this_subtask = start_pos
+        self.end_pos_of_this_subtask = end_pos
+        return self
+
+    def _filter_by_pos(self, files):
+        if self.table.is_primary_key_table:
+            return self._primary_key_filter_by_shard(files)
+        elif self.data_evolution:
+            if self.start_pos_of_this_subtask is not None:
+                # shard data range: [plan_start_pos, plan_end_pos)
+                files, self.plan_start_end_pos = \
+                    self._data_evolution_filter_by_slice(files,
+                                                         
self.start_pos_of_this_subtask,
+                                                         
self.end_pos_of_this_subtask)
+            elif self.idx_of_this_subtask is not None:
+                files, self.plan_start_end_pos = 
self._data_evolution_filter_by_shard(files)
+            return files
+        else:
+            if self.start_pos_of_this_subtask is not None:
+                # shard data range: [plan_start_pos, plan_end_pos)
+                files, self.plan_start_end_pos = \
+                    self._append_only_filter_by_slice(files,
+                                                      
self.start_pos_of_this_subtask,
+                                                      
self.end_pos_of_this_subtask)
+            elif self.idx_of_this_subtask is not None:
+                files, self.plan_start_end_pos = 
self._append_only_filter_by_shard(files)
+            return files
+
+    def _compute_split_pos(self, splits: List['Split']) -> None:
+        if self.start_pos_of_this_subtask is not None or 
self.idx_of_this_subtask is not None:
+            # When files are combined into splits, it is necessary to find 
files that needs to be divided for each split
+            self._compute_split_start_end_pos(splits, 
self.plan_start_end_pos[0], self.plan_start_end_pos[1])
+
+    def _append_only_filter_by_slice(self, partitioned_files: defaultdict, 
start_pos: int,
+                                     end_pos: int) -> (defaultdict, int, int):
+        plan_start_pos = 0
+        plan_end_pos = 0
+        entry_end_pos = 0  # end row position of current file in all data
+        splits_start_pos = 0
+        filtered_partitioned_files = defaultdict(list)
+        # Iterate through all file entries to find files that overlap with 
current shard range
+        for key, file_entries in partitioned_files.items():
+            filtered_entries = []
+            for entry in file_entries:
+                entry_begin_pos = entry_end_pos  # Starting row position of 
current file in all data
+                entry_end_pos += entry.file.row_count  # Update to row 
position after current file
+
+                # If current file is completely after shard range, stop 
iteration
+                if entry_begin_pos >= end_pos:
+                    break
+                # If current file is completely before shard range, skip it
+                if entry_end_pos <= start_pos:
+                    continue
+                if entry_begin_pos <= start_pos < entry_end_pos:
+                    splits_start_pos = entry_begin_pos
+                    plan_start_pos = start_pos - entry_begin_pos
+                # If shard end position is within current file, record 
relative end position
+                if entry_begin_pos < end_pos <= entry_end_pos:
+                    plan_end_pos = end_pos - splits_start_pos
+                # Add files that overlap with shard range to result
+                filtered_entries.append(entry)
+            if filtered_entries:
+                filtered_partitioned_files[key] = filtered_entries
+
+        return filtered_partitioned_files, (plan_start_pos, plan_end_pos)
+
+    def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> 
(defaultdict, int, int):
+        """

Review Comment:
   similar code with `_data_evolution_filter_by_shard`, can you refactor ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to