JingsongLi commented on code in PR #6274:
URL: https://github.com/apache/paimon/pull/6274#discussion_r2367316211


##########
paimon-python/pypaimon/read/table_scan.py:
##########
@@ -185,38 +248,59 @@ def _filter_by_stats(self, file_entry: ManifestEntry) -> 
bool:
         })
 
     def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
-        if not file_entries:
-            return []
+        if self.idx_of_this_subtask is not None:
+            # Sort by file creation time to ensure consistent sharding
+            file_entries.sort(key=lambda x: x.file.creation_time)
 
-        data_files: List[DataFileMeta] = [e.file for e in file_entries]
+        partitioned_split = defaultdict(list)
+        for entry in file_entries:
+            partitioned_split[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
 
         def weight_func(f: DataFileMeta) -> int:
             return max(f.file_size, self.open_file_cost)
 
-        packed_files: List[List[DataFileMeta]] = 
self._pack_for_ordered(data_files, weight_func, self.target_split_size)
-        return self._build_split_from_pack(packed_files, file_entries, False)
+        splits = []
+        for key, file_entries in partitioned_split.items():
+            if not file_entries:
+                return []
 
-    def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
-        if not file_entries:
-            return []
+            data_files: List[DataFileMeta] = [e.file for e in file_entries]
 
-        data_files: List[DataFileMeta] = [e.file for e in file_entries]
-        partition_sort_runs: List[List[SortedRun]] = 
IntervalPartition(data_files).partition()
-        sections: List[List[DataFileMeta]] = [
-            [file for s in sl for file in s.files]
-            for sl in partition_sort_runs
-        ]
+            packed_files: List[List[DataFileMeta]] = 
self._pack_for_ordered(data_files, weight_func,
+                                                                            
self.target_split_size)
+            splits += self._build_split_from_pack(packed_files, file_entries, 
False)
+        if self.idx_of_this_subtask is not None:
+            splits = self._filter_by_shard(splits)
+        return splits
+
+    def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
+        partitioned_split = defaultdict(list)

Review Comment:
   pk table also need to support sharding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to