This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 55b819407 [flink] Stream read snapshot slow while reading from
history, waiting for time interval to trigger stream scan (#1704)
55b819407 is described below
commit 55b8194076b2c00f23d097efaac1935acbf11fd7
Author: YeJunHao <[email protected]>
AuthorDate: Fri Aug 4 16:37:02 2023 +0800
[flink] Stream read snapshot slow while reading from history, waiting for
time interval to trigger stream scan (#1704)
---
.../source/ContinuousFileSplitEnumerator.java | 65 ++++++---
.../source/ContinuousFileSplitEnumeratorTest.java | 154 ++++++++++++++++++++-
2 files changed, 197 insertions(+), 22 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 46bc71e5c..1e2161fdb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
@@ -55,6 +56,7 @@ public class ContinuousFileSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit,
PendingSplitsCheckpoint> {
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
+ private static final int SPLIT_MAX_NUM = 5_000;
protected final SplitEnumeratorContext<FileStoreSourceSplit> context;
@@ -72,6 +74,7 @@ public class ContinuousFileSplitEnumerator
@Nullable protected Long nextSnapshotId;
+ protected boolean blockScanByRequest = false;
protected boolean finished = false;
public ContinuousFileSplitEnumerator(
@@ -120,7 +123,7 @@ public class ContinuousFileSplitEnumerator
@Override
public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
readersAwaitingSplit.add(subtaskId);
- assignSplits();
+ doHandleSplitRequest(subtaskId);
}
@Override
@@ -146,12 +149,16 @@ public class ContinuousFileSplitEnumerator
// ------------------------------------------------------------------------
- protected PlanWithNextSnapshotId scanNextSnapshot() {
+ // this need to be synchronized because scan object is not thread safe.
handleSplitRequest and
+ // context.callAsync will invoke this.
+ protected synchronized PlanWithNextSnapshotId scanNextSnapshot() {
TableScan.Plan plan = scan.plan();
Long nextSnapshotId = scan.checkpoint();
return new PlanWithNextSnapshotId(plan, nextSnapshotId);
}
+ // this mothod could not be synchronized, because it runs in
coordinatorThread, which will make
+ // it serialize.
protected void processDiscoveredSplits(
PlanWithNextSnapshotId planWithNextSnapshotId, Throwable error) {
if (error != null) {
@@ -168,6 +175,12 @@ public class ContinuousFileSplitEnumerator
nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
TableScan.Plan plan = planWithNextSnapshotId.plan;
+ if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
+ blockScanByRequest = true;
+ return;
+ } else {
+ blockScanByRequest = false;
+ }
if (plan.splits().isEmpty()) {
return;
@@ -177,12 +190,39 @@ public class ContinuousFileSplitEnumerator
assignSplits();
}
+ private void doHandleSplitRequest(int taskId) {
+ assignSplits();
+ // if current task assigned no split, we check conditions to scan one
more time
+ if (readersAwaitingSplit.contains(taskId)) {
+ if (!shouldInvokeScan()) {
+ return;
+ }
+ blockScanByRequest = true;
+ context.callAsync(this::scanNextSnapshot,
this::processDiscoveredSplits);
+ }
+ }
+
/**
* Method should be synchronized because {@link #handleSplitRequest} and
{@link
* #processDiscoveredSplits} have thread conflicts.
*/
protected synchronized void assignSplits() {
- Map<Integer, List<FileStoreSourceSplit>> assignment =
createAssignment();
+ // create assignment
+ Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
+ Iterator<Integer> readersAwait = readersAwaitingSplit.iterator();
+ while (readersAwait.hasNext()) {
+ Integer task = readersAwait.next();
+ if (!context.registeredReaders().containsKey(task)) {
+ readersAwait.remove();
+ continue;
+ }
+ List<FileStoreSourceSplit> splits = splitAssigner.getNext(task,
null);
+ if (splits.size() > 0) {
+ assignment.put(task, splits);
+ }
+ }
+
+ // remove readers who fetched splits
if (noMoreSplits()) {
Iterator<Integer> iterator = readersAwaitingSplit.iterator();
while (iterator.hasNext()) {
@@ -197,26 +237,13 @@ public class ContinuousFileSplitEnumerator
context.assignSplits(new SplitsAssignment<>(assignment));
}
- private Map<Integer, List<FileStoreSourceSplit>> createAssignment() {
- Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
- Iterator<Integer> readersAwait = readersAwaitingSplit.iterator();
- while (readersAwait.hasNext()) {
- Integer task = readersAwait.next();
- if (!context.registeredReaders().containsKey(task)) {
- readersAwait.remove();
- continue;
- }
- List<FileStoreSourceSplit> splits = splitAssigner.getNext(task,
null);
- if (splits.size() > 0) {
- assignment.put(task, splits);
- }
- }
- return assignment;
+ private boolean shouldInvokeScan() {
+ return !blockScanByRequest && splitAssigner.remainingSplits().size()
<= SPLIT_MAX_NUM;
}
protected int assignTask(int bucket) {
if (bucketMode == BucketMode.UNAWARE) {
- // we just assign task 0 when bucket unaware
+ // we just assign task 0 when bucket unaware, because we don't
need this.
return 0;
} else {
// if not bucket unaware, we assign the bucket % parallelism, the
same bucket data go
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index ce4acc286..fc69b2aed 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
@@ -200,7 +201,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(1, "test-host");
TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
- StreamTableScan scan = new MockScan(results);
+ MockScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
.setSplitEnumeratorContext(context)
@@ -378,7 +379,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(3, "test-host");
TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
- StreamTableScan scan = new MockScan(results);
+ MockScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
.setSplitEnumeratorContext(context)
@@ -390,6 +391,7 @@ public class ContinuousFileSplitEnumeratorTest {
enumerator.start();
// assign to task 0, but no assigned. add to wait list
+ scan.allowEnd(false);
enumerator.handleSplitRequest(0, "test-host");
Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
context.getSplitAssignments();
@@ -501,6 +503,143 @@ public class ContinuousFileSplitEnumeratorTest {
.doesNotThrowAnyException();
}
+ @Test
+ public void testTriggerScanByTaskRequest() throws Exception {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(2);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ MockScan scan = new MockScan(results);
+ scan.allowEnd(false);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .build();
+ enumerator.start();
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+ }
+ results.put(1L, new DataFilePlan(splits));
+
+ // request directly
+ enumerator.handleSplitRequest(0, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0);
+ List<FileStoreSourceSplit> assignedSplits =
assignments.get(0).getAssignedSplits();
+
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(0));
+
+ enumerator.handleSplitRequest(1, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1);
+ assignedSplits = assignments.get(1).getAssignedSplits();
+
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(1));
+ }
+
+ @Test
+ public void testNoTriggerWhenReadLatest() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(4);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+ context.registerReader(2, "test-host");
+ context.registerReader(3, "test-host");
+
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ MockScan scan = new MockScan(results);
+ scan.allowEnd(false);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .build();
+ enumerator.start();
+ enumerator.handleSplitRequest(0, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+ }
+ results.put(1L, new DataFilePlan(splits));
+
+ // will not trigger scan here
+ enumerator.handleSplitRequest(0, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments).isEmpty();
+
+ enumerator.handleSplitRequest(1, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).isEmpty();
+
+ // trigger all actions, we will scan anyway
+ context.triggerAllActions();
+
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1);
+ List<FileStoreSourceSplit> assignedSplits =
assignments.get(0).getAssignedSplits();
+
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(0));
+ assignedSplits = assignments.get(1).getAssignedSplits();
+
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(1));
+
+ splits.clear();
+ for (int i = 2; i < 4; i++) {
+ splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+ }
+ results.put(2L, new DataFilePlan(splits));
+ // because blockScanByRequest = false, so this request will trigger
scan
+ enumerator.handleSplitRequest(2, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ enumerator.handleSplitRequest(3, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1, 2, 3);
+ assignedSplits = assignments.get(2).getAssignedSplits();
+
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(0));
+ assignedSplits = assignments.get(3).getAssignedSplits();
+
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(1));
+
+ // this will trigger scan, and then set blockScanByRequest = true
+ enumerator.handleSplitRequest(3, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ splits.clear();
+ splits.add(createDataSplit(snapshot, 7, Collections.emptyList()));
+ results.put(3L, new DataFilePlan(splits));
+
+ // this won't trigger scan, cause blockScanByRequest = true
+ enumerator.handleSplitRequest(3, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ assignments = context.getSplitAssignments();
+ assignedSplits = assignments.get(3).getAssignedSplits();
+ assertThat(toDataSplits(assignedSplits)).doesNotContain(splits.get(0));
+
+ // forcely set blockScanByRequest = false, so the split request below
will trigger scan
+ enumerator.blockScanByRequest = false;
+ // trigger scan here
+ enumerator.handleSplitRequest(3, "test-host");
+ context.getExecutorService().triggerAllNonPeriodicTasks();
+ assignments = context.getSplitAssignments();
+ assignedSplits = assignments.get(3).getAssignedSplits();
+ // get expected split
+ assertThat(toDataSplits(assignedSplits)).contains(splits.get(0));
+ }
+
@Test
public void testEnumeratorWithCheckpoint() {
final TestingAsyncSplitEnumeratorContext<FileStoreSourceSplit> context
=
@@ -652,6 +791,7 @@ public class ContinuousFileSplitEnumeratorTest {
private final TreeMap<Long, Plan> results;
private @Nullable Long nextSnapshotId;
+ private boolean allowEnd = true;
public MockScan(TreeMap<Long, Plan> results) {
this.results = results;
@@ -662,7 +802,11 @@ public class ContinuousFileSplitEnumeratorTest {
public Plan plan() {
Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
if (planEntry == null) {
- throw new EndOfScanException();
+ if (allowEnd) {
+ throw new EndOfScanException();
+ } else {
+ return SnapshotNotExistPlan.INSTANCE;
+ }
}
nextSnapshotId = planEntry.getKey() + 1;
return planEntry.getValue();
@@ -689,6 +833,10 @@ public class ContinuousFileSplitEnumeratorTest {
@Override
public void restore(Long state) {}
+
+ public void allowEnd(boolean allowEnd) {
+ this.allowEnd = allowEnd;
+ }
}
private static class TestingAsyncSplitEnumeratorContext<SplitT extends
SourceSplit>