xushiyan commented on code in PR #18082:
URL: https://github.com/apache/hudi/pull/18082#discussion_r2766478121


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitProvider.java:
##########
@@ -37,29 +40,45 @@ public class DefaultHoodieSplitProvider implements 
HoodieSplitProvider {
   public static final int DEFAULT_SPLIT_QUEUE_SIZE = 20;
 
   private Queue<HoodieSourceSplit> pendingSplits;
+  private HoodieSplitAssigner splitAssigner;
 
-  public DefaultHoodieSplitProvider() {
+  public DefaultHoodieSplitProvider(HoodieSplitAssigner splitAssigner) {
     this.pendingSplits = new ConcurrentLinkedDeque<>();
+    this.splitAssigner = splitAssigner;
   }
 
   /**
    * Creates a DefaultHoodieSplitProvider with a custom comparator for 
ordering splits.
    *
-   * @param comparator the comparator to use for ordering splits
+   * @param comparator the comparator to use for ordering splits (must not be 
null)
+   * @param splitAssigner the assigner that assigns each split to a task ID 
(must not be null)
    */
-  public DefaultHoodieSplitProvider(SerializableComparator<HoodieSourceSplit> 
comparator) {
-    ValidationUtils.checkArgument(comparator != null,
-        "The hoodie source split comparator can't be null");
+  public DefaultHoodieSplitProvider(
+          SerializableComparator<HoodieSourceSplit> comparator,
+          HoodieSplitAssigner splitAssigner) {
     this.pendingSplits = new PriorityBlockingQueue<>(DEFAULT_SPLIT_QUEUE_SIZE, 
comparator);
+    this.splitAssigner = splitAssigner;
   }
 
   @Override
-  public Option<HoodieSourceSplit> getNext(@Nullable String hostname) {
-    if (!pendingSplits.isEmpty()) {
-      return Option.of(pendingSplits.poll());
+  public Option<HoodieSourceSplit> getNext(int subTaskId, @Nullable String 
hostname) {
+    List<HoodieSourceSplit> visited = new ArrayList<>();
+    HoodieSourceSplit targetSplit = null;
+    while (!pendingSplits.isEmpty()) {
+      HoodieSourceSplit split = pendingSplits.poll();
+      int target = splitAssigner.assign(split);
+      if (subTaskId != target) {
+        visited.add(split);
+      } else {
+        targetSplit = split;
+        break;
+      }
     }
 
-    return Option.empty();
+    if (!visited.isEmpty()) {
+      pendingSplits.addAll(visited);

Review Comment:
   this impl makes getNext() O(n) as it needs to scan the queue until a match. 
should use a different data structure to handle per-subtask assignment, like 
multimap for e.g.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to