This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d559165aa [flink] Improve codes for
AlignedContinuousFileSplitEnumerator
d559165aa is described below
commit d559165aacbaa92490311f9e40702ae3dd1cdaec
Author: Jingsong <[email protected]>
AuthorDate: Fri Aug 4 17:03:58 2023 +0800
[flink] Improve codes for AlignedContinuousFileSplitEnumerator
---
.../source/ContinuousFileSplitEnumerator.java | 60 +++++++++-------------
.../AlignedContinuousFileSplitEnumerator.java | 7 +--
.../source/assigners/AlignedSplitAssigner.java | 6 +--
.../flink/source/assigners/FIFOSplitAssigner.java | 2 +-
.../source/assigners/PreAssignSplitAssigner.java | 4 +-
.../flink/source/assigners/SplitAssigner.java | 2 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 4 +-
7 files changed, 37 insertions(+), 48 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 1e2161fdb..b5ba3d839 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
@@ -70,13 +71,12 @@ public class ContinuousFileSplitEnumerator
protected final SplitAssigner splitAssigner;
- protected final BucketMode bucketMode;
-
@Nullable protected Long nextSnapshotId;
- protected boolean blockScanByRequest = false;
protected boolean finished = false;
+ private boolean stopTriggerScan = false;
+
public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> remainSplits,
@@ -91,17 +91,21 @@ public class ContinuousFileSplitEnumerator
this.readersAwaitingSplit = new LinkedHashSet<>();
this.splitGenerator = new FileStoreSourceSplitGenerator();
this.scan = scan;
- this.bucketMode = bucketMode;
- this.splitAssigner = createSplitAssigner();
+ this.splitAssigner = createSplitAssigner(bucketMode);
addSplits(remainSplits);
}
+ @VisibleForTesting
+ void enableTriggerScan() {
+ this.stopTriggerScan = false;
+ }
+
protected void addSplits(Collection<FileStoreSourceSplit> splits) {
splits.forEach(this::addSplit);
}
private void addSplit(FileStoreSourceSplit split) {
- splitAssigner.addSplit(assignTask(((DataSplit)
split.split()).bucket()), split);
+ splitAssigner.addSplit(assignSuggestedTask(split), split);
}
@Override
@@ -123,7 +127,15 @@ public class ContinuousFileSplitEnumerator
@Override
public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
readersAwaitingSplit.add(subtaskId);
- doHandleSplitRequest(subtaskId);
+ assignSplits();
+ // if current task assigned no split, we check conditions to scan one
more time
+ if (readersAwaitingSplit.contains(subtaskId)) {
+ if (stopTriggerScan || splitAssigner.remainingSplits().size() >=
SPLIT_MAX_NUM) {
+ return;
+ }
+ stopTriggerScan = true;
+ context.callAsync(this::scanNextSnapshot,
this::processDiscoveredSplits);
+ }
}
@Override
@@ -176,12 +188,11 @@ public class ContinuousFileSplitEnumerator
nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
TableScan.Plan plan = planWithNextSnapshotId.plan;
if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
- blockScanByRequest = true;
+ stopTriggerScan = true;
return;
- } else {
- blockScanByRequest = false;
}
+ stopTriggerScan = false;
if (plan.splits().isEmpty()) {
return;
}
@@ -190,18 +201,6 @@ public class ContinuousFileSplitEnumerator
assignSplits();
}
- private void doHandleSplitRequest(int taskId) {
- assignSplits();
- // if current task assigned no split, we check conditions to scan one
more time
- if (readersAwaitingSplit.contains(taskId)) {
- if (!shouldInvokeScan()) {
- return;
- }
- blockScanByRequest = true;
- context.callAsync(this::scanNextSnapshot,
this::processDiscoveredSplits);
- }
- }
-
/**
* Method should be synchronized because {@link #handleSplitRequest} and
{@link
* #processDiscoveredSplits} have thread conflicts.
@@ -237,22 +236,11 @@ public class ContinuousFileSplitEnumerator
context.assignSplits(new SplitsAssignment<>(assignment));
}
- private boolean shouldInvokeScan() {
- return !blockScanByRequest && splitAssigner.remainingSplits().size()
<= SPLIT_MAX_NUM;
- }
-
- protected int assignTask(int bucket) {
- if (bucketMode == BucketMode.UNAWARE) {
- // we just assign task 0 when bucket unaware, because we don't
need this.
- return 0;
- } else {
- // if not bucket unaware, we assign the bucket % parallelism, the
same bucket data go
- // into the same task
- return bucket % context.currentParallelism();
- }
+ protected int assignSuggestedTask(FileStoreSourceSplit split) {
+ return ((DataSplit) split.split()).bucket() %
context.currentParallelism();
}
- protected SplitAssigner createSplitAssigner() {
+ protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
return bucketMode == BucketMode.UNAWARE
? new FIFOSplitAssigner(Collections.emptyList())
: new PreAssignSplitAssigner(1, context,
Collections.emptyList());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 4ffe36b5e..af9f71f54 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -122,8 +122,9 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
Collection<FileStoreSourceSplit> splits) {
Map<Integer, List<FileStoreSourceSplit>> subtaskSplits = new
HashMap<>();
for (FileStoreSourceSplit split : splits) {
- int taskId = assignTask(((DataSplit) split.split()).bucket());
- subtaskSplits.computeIfAbsent(taskId, subtask -> new
ArrayList<>()).add(split);
+ subtaskSplits
+ .computeIfAbsent(assignSuggestedTask(split), subtask ->
new ArrayList<>())
+ .add(split);
}
return subtaskSplits;
}
@@ -240,7 +241,7 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
}
@Override
- protected SplitAssigner createSplitAssigner() {
+ protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
return new AlignedSplitAssigner();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
index 41d6f097b..e1892ec19 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
@@ -72,7 +72,7 @@ public class AlignedSplitAssigner implements SplitAssigner {
}
@Override
- public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
+ public void addSplitsBack(int suggestedTask, List<FileStoreSourceSplit>
splits) {
if (splits.isEmpty()) {
return;
}
@@ -82,10 +82,10 @@ public class AlignedSplitAssigner implements SplitAssigner {
PendingSnapshot head = pendingSplitAssignment.peek();
if (head == null || snapshotId != head.snapshotId) {
head = new PendingSnapshot(snapshotId, isPlaceholder, new
HashMap<>());
- head.addAll(subtask, splits);
+ head.addAll(suggestedTask, splits);
pendingSplitAssignment.addFirst(head);
} else {
- head.addAll(subtask, splits);
+ head.addAll(suggestedTask, splits);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
index 87289b34d..52cf28f21 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
@@ -49,7 +49,7 @@ public class FIFOSplitAssigner implements SplitAssigner {
}
@Override
- public void addSplit(int subtask, FileStoreSourceSplit split) {
+ public void addSplit(int suggestedTask, FileStoreSourceSplit split) {
pendingSplitAssignment.add(split);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
index 3f45133c3..63e24026f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
@@ -68,8 +68,8 @@ public class PreAssignSplitAssigner implements SplitAssigner {
}
@Override
- public void addSplit(int subtask, FileStoreSourceSplit split) {
- pendingSplitAssignment.computeIfAbsent(subtask, k -> new
LinkedList<>()).add(split);
+ public void addSplit(int suggestedTask, FileStoreSourceSplit split) {
+ pendingSplitAssignment.computeIfAbsent(suggestedTask, k -> new
LinkedList<>()).add(split);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
index ea0f99804..e89d9fa63 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
@@ -40,7 +40,7 @@ public interface SplitAssigner {
List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname);
/** Add one split of a specified subtask to the assigner. */
- void addSplit(int subtask, FileStoreSourceSplit splits);
+ void addSplit(int suggestedTask, FileStoreSourceSplit splits);
/**
* Adds a set of splits to this assigner. This happens for example when
some split processing
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index fc69b2aed..d02cd2b89 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -629,8 +629,8 @@ public class ContinuousFileSplitEnumeratorTest {
assignedSplits = assignments.get(3).getAssignedSplits();
assertThat(toDataSplits(assignedSplits)).doesNotContain(splits.get(0));
- // forcely set blockScanByRequest = false, so the split request below
will trigger scan
- enumerator.blockScanByRequest = false;
+ // forcely enable trigger scan, so the split request below will
trigger scan
+ enumerator.enableTriggerScan();
// trigger scan here
enumerator.handleSplitRequest(3, "test-host");
context.getExecutorService().triggerAllNonPeriodicTasks();