This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d559165aa [flink] Improve codes for 
AlignedContinuousFileSplitEnumerator
d559165aa is described below

commit d559165aacbaa92490311f9e40702ae3dd1cdaec
Author: Jingsong <[email protected]>
AuthorDate: Fri Aug 4 17:03:58 2023 +0800

    [flink] Improve codes for AlignedContinuousFileSplitEnumerator
---
 .../source/ContinuousFileSplitEnumerator.java      | 60 +++++++++-------------
 .../AlignedContinuousFileSplitEnumerator.java      |  7 +--
 .../source/assigners/AlignedSplitAssigner.java     |  6 +--
 .../flink/source/assigners/FIFOSplitAssigner.java  |  2 +-
 .../source/assigners/PreAssignSplitAssigner.java   |  4 +-
 .../flink/source/assigners/SplitAssigner.java      |  2 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  |  4 +-
 7 files changed, 37 insertions(+), 48 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 1e2161fdb..b5ba3d839 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
 import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
 import org.apache.paimon.flink.source.assigners.SplitAssigner;
@@ -70,13 +71,12 @@ public class ContinuousFileSplitEnumerator
 
     protected final SplitAssigner splitAssigner;
 
-    protected final BucketMode bucketMode;
-
     @Nullable protected Long nextSnapshotId;
 
-    protected boolean blockScanByRequest = false;
     protected boolean finished = false;
 
+    private boolean stopTriggerScan = false;
+
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             Collection<FileStoreSourceSplit> remainSplits,
@@ -91,17 +91,21 @@ public class ContinuousFileSplitEnumerator
         this.readersAwaitingSplit = new LinkedHashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
         this.scan = scan;
-        this.bucketMode = bucketMode;
-        this.splitAssigner = createSplitAssigner();
+        this.splitAssigner = createSplitAssigner(bucketMode);
         addSplits(remainSplits);
     }
 
+    @VisibleForTesting
+    void enableTriggerScan() {
+        this.stopTriggerScan = false;
+    }
+
     protected void addSplits(Collection<FileStoreSourceSplit> splits) {
         splits.forEach(this::addSplit);
     }
 
     private void addSplit(FileStoreSourceSplit split) {
-        splitAssigner.addSplit(assignTask(((DataSplit) 
split.split()).bucket()), split);
+        splitAssigner.addSplit(assignSuggestedTask(split), split);
     }
 
     @Override
@@ -123,7 +127,15 @@ public class ContinuousFileSplitEnumerator
     @Override
     public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
         readersAwaitingSplit.add(subtaskId);
-        doHandleSplitRequest(subtaskId);
+        assignSplits();
+        // if current task assigned no split, we check conditions to scan one 
more time
+        if (readersAwaitingSplit.contains(subtaskId)) {
+            if (stopTriggerScan || splitAssigner.remainingSplits().size() >= 
SPLIT_MAX_NUM) {
+                return;
+            }
+            stopTriggerScan = true;
+            context.callAsync(this::scanNextSnapshot, 
this::processDiscoveredSplits);
+        }
     }
 
     @Override
@@ -176,12 +188,11 @@ public class ContinuousFileSplitEnumerator
         nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
         TableScan.Plan plan = planWithNextSnapshotId.plan;
         if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
-            blockScanByRequest = true;
+            stopTriggerScan = true;
             return;
-        } else {
-            blockScanByRequest = false;
         }
 
+        stopTriggerScan = false;
         if (plan.splits().isEmpty()) {
             return;
         }
@@ -190,18 +201,6 @@ public class ContinuousFileSplitEnumerator
         assignSplits();
     }
 
-    private void doHandleSplitRequest(int taskId) {
-        assignSplits();
-        // if current task assigned no split, we check conditions to scan one 
more time
-        if (readersAwaitingSplit.contains(taskId)) {
-            if (!shouldInvokeScan()) {
-                return;
-            }
-            blockScanByRequest = true;
-            context.callAsync(this::scanNextSnapshot, 
this::processDiscoveredSplits);
-        }
-    }
-
     /**
      * Method should be synchronized because {@link #handleSplitRequest} and 
{@link
      * #processDiscoveredSplits} have thread conflicts.
@@ -237,22 +236,11 @@ public class ContinuousFileSplitEnumerator
         context.assignSplits(new SplitsAssignment<>(assignment));
     }
 
-    private boolean shouldInvokeScan() {
-        return !blockScanByRequest && splitAssigner.remainingSplits().size() 
<= SPLIT_MAX_NUM;
-    }
-
-    protected int assignTask(int bucket) {
-        if (bucketMode == BucketMode.UNAWARE) {
-            // we just assign task 0 when bucket unaware, because we don't 
need this.
-            return 0;
-        } else {
-            // if not bucket unaware, we assign the bucket % parallelism, the 
same bucket data go
-            // into the same task
-            return bucket % context.currentParallelism();
-        }
+    protected int assignSuggestedTask(FileStoreSourceSplit split) {
+        return ((DataSplit) split.split()).bucket() % 
context.currentParallelism();
     }
 
-    protected SplitAssigner createSplitAssigner() {
+    protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
         return bucketMode == BucketMode.UNAWARE
                 ? new FIFOSplitAssigner(Collections.emptyList())
                 : new PreAssignSplitAssigner(1, context, 
Collections.emptyList());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 4ffe36b5e..af9f71f54 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -122,8 +122,9 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
             Collection<FileStoreSourceSplit> splits) {
         Map<Integer, List<FileStoreSourceSplit>> subtaskSplits = new 
HashMap<>();
         for (FileStoreSourceSplit split : splits) {
-            int taskId = assignTask(((DataSplit) split.split()).bucket());
-            subtaskSplits.computeIfAbsent(taskId, subtask -> new 
ArrayList<>()).add(split);
+            subtaskSplits
+                    .computeIfAbsent(assignSuggestedTask(split), subtask -> 
new ArrayList<>())
+                    .add(split);
         }
         return subtaskSplits;
     }
@@ -240,7 +241,7 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
     }
 
     @Override
-    protected SplitAssigner createSplitAssigner() {
+    protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
         return new AlignedSplitAssigner();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
index 41d6f097b..e1892ec19 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
@@ -72,7 +72,7 @@ public class AlignedSplitAssigner implements SplitAssigner {
     }
 
     @Override
-    public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
+    public void addSplitsBack(int suggestedTask, List<FileStoreSourceSplit> 
splits) {
         if (splits.isEmpty()) {
             return;
         }
@@ -82,10 +82,10 @@ public class AlignedSplitAssigner implements SplitAssigner {
         PendingSnapshot head = pendingSplitAssignment.peek();
         if (head == null || snapshotId != head.snapshotId) {
             head = new PendingSnapshot(snapshotId, isPlaceholder, new 
HashMap<>());
-            head.addAll(subtask, splits);
+            head.addAll(suggestedTask, splits);
             pendingSplitAssignment.addFirst(head);
         } else {
-            head.addAll(subtask, splits);
+            head.addAll(suggestedTask, splits);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
index 87289b34d..52cf28f21 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
@@ -49,7 +49,7 @@ public class FIFOSplitAssigner implements SplitAssigner {
     }
 
     @Override
-    public void addSplit(int subtask, FileStoreSourceSplit split) {
+    public void addSplit(int suggestedTask, FileStoreSourceSplit split) {
         pendingSplitAssignment.add(split);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
index 3f45133c3..63e24026f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
@@ -68,8 +68,8 @@ public class PreAssignSplitAssigner implements SplitAssigner {
     }
 
     @Override
-    public void addSplit(int subtask, FileStoreSourceSplit split) {
-        pendingSplitAssignment.computeIfAbsent(subtask, k -> new 
LinkedList<>()).add(split);
+    public void addSplit(int suggestedTask, FileStoreSourceSplit split) {
+        pendingSplitAssignment.computeIfAbsent(suggestedTask, k -> new 
LinkedList<>()).add(split);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
index ea0f99804..e89d9fa63 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
@@ -40,7 +40,7 @@ public interface SplitAssigner {
     List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname);
 
     /** Add one split of a specified subtask to the assigner. */
-    void addSplit(int subtask, FileStoreSourceSplit splits);
+    void addSplit(int suggestedTask, FileStoreSourceSplit splits);
 
     /**
      * Adds a set of splits to this assigner. This happens for example when 
some split processing
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index fc69b2aed..d02cd2b89 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -629,8 +629,8 @@ public class ContinuousFileSplitEnumeratorTest {
         assignedSplits = assignments.get(3).getAssignedSplits();
         assertThat(toDataSplits(assignedSplits)).doesNotContain(splits.get(0));
 
-        // forcely set blockScanByRequest = false, so the split request below 
will trigger scan
-        enumerator.blockScanByRequest = false;
+        // forcely enable trigger scan, so the split request below will 
trigger scan
+        enumerator.enableTriggerScan();
         // trigger scan here
         enumerator.handleSplitRequest(3, "test-host");
         context.getExecutorService().triggerAllNonPeriodicTasks();

Reply via email to