This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f480fd9a9 [flink] fix the issue that the historical snapshot may be
lost when Enumerator performs snapshotState. (#1562)
f480fd9a9 is described below
commit f480fd9a92818e78e043e108f77ac95ea7f2c1d8
Author: liming.1018 <[email protected]>
AuthorDate: Thu Jul 13 17:22:35 2023 +0800
[flink] fix the issue that the historical snapshot may be lost when
Enumerator performs snapshotState. (#1562)
---
.../source/ContinuousFileSplitEnumerator.java | 25 +++-
.../source/ContinuousFileSplitEnumeratorTest.java | 160 ++++++++++++++++++---
2 files changed, 160 insertions(+), 25 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 14c5c39af..0c695ebff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -106,7 +106,8 @@ public class ContinuousFileSplitEnumerator
@Override
public void start() {
- context.callAsync(scan::plan, this::processDiscoveredSplits, 0,
discoveryInterval);
+ context.callAsync(
+ this::scanNextSnapshot, this::processDiscoveredSplits, 0,
discoveryInterval);
}
@Override
@@ -148,7 +149,14 @@ public class ContinuousFileSplitEnumerator
// ------------------------------------------------------------------------
- private void processDiscoveredSplits(TableScan.Plan plan, Throwable error)
{
+ private PlanWithNextSnapshotId scanNextSnapshot() {
+ TableScan.Plan plan = scan.plan();
+ Long nextSnapshotId = scan.checkpoint();
+ return new PlanWithNextSnapshotId(plan, nextSnapshotId);
+ }
+
+ private void processDiscoveredSplits(
+ PlanWithNextSnapshotId planWithNextSnapshotId, Throwable error) {
if (error != null) {
if (error instanceof EndOfScanException) {
// finished
@@ -161,7 +169,8 @@ public class ContinuousFileSplitEnumerator
return;
}
- nextSnapshotId = scan.checkpoint();
+ nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
+ TableScan.Plan plan = planWithNextSnapshotId.plan;
if (plan.splits().isEmpty()) {
return;
@@ -218,4 +227,14 @@ public class ContinuousFileSplitEnumerator
return bucket % context.currentParallelism();
}
}
+
+ private static class PlanWithNextSnapshotId {
+ private final TableScan.Plan plan;
+ private final Long nextSnapshotId;
+
+ public PlanWithNextSnapshotId(TableScan.Plan plan, Long
nextSnapshotId) {
+ this.plan = plan;
+ this.nextSnapshotId = nextSnapshotId;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index f511ee2af..a35e309f0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -27,19 +27,25 @@ import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
+import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState;
@@ -193,7 +199,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -209,7 +215,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.add(new DataFilePlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -262,7 +268,7 @@ public class ContinuousFileSplitEnumeratorTest {
new TestingSplitEnumeratorContext<>(3);
context.registerReader(0, "test-host");
- Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -277,7 +283,7 @@ public class ContinuousFileSplitEnumeratorTest {
long snapshot = 0;
List<DataSplit> splits = new ArrayList<>();
splits.add(createDataSplit(snapshot, 1, Collections.emptyList()));
- results.add(new DataFilePlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -289,7 +295,7 @@ public class ContinuousFileSplitEnumeratorTest {
splits.clear();
splits.add(createDataSplit(snapshot, 2, Collections.emptyList()));
- results.add(new DataFilePlan(splits));
+ results.put(2L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -308,7 +314,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(2, "test-host");
context.registerReader(3, "test-host");
- Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -325,7 +331,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 100; i++) {
splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
}
- results.add(new DataFilePlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -371,7 +377,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(2, "test-host");
context.registerReader(3, "test-host");
- Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -399,7 +405,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 100; i++) {
splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
}
- results.add(new DataFilePlan(splits));
+ results.put(1L, new DataFilePlan(splits));
// trigger assign task 0 and task 1 will get their assignment
context.triggerAllActions();
@@ -428,7 +434,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -445,7 +451,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.add(new DataFilePlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -469,7 +475,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -487,7 +493,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.add(new DataFilePlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.registeredReaders().remove(1);
// assign to task 0
@@ -495,7 +501,73 @@ public class ContinuousFileSplitEnumeratorTest {
.doesNotThrowAnyException();
}
- private static List<DataSplit> toDataSplits(List<FileStoreSourceSplit>
splits) {
+ @Test
+ public void testEnumeratorWithCheckpoint() {
+ final TestingAsyncSplitEnumeratorContext<FileStoreSourceSplit> context
=
+ new TestingAsyncSplitEnumeratorContext<>(1);
+ context.registerReader(0, "test-host");
+
+ // prepare test data
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ Map<Long, List<DataSplit>> expectedResults = new HashMap<>(4);
+ StreamTableScan scan = new MockScan(results);
+ for (int i = 1; i <= 4; i++) {
+ List<DataSplit> dataSplits =
+ Collections.singletonList(createDataSplit(i, 0,
Collections.emptyList()));
+ results.put((long) i, new DataFilePlan(dataSplits));
+ expectedResults.put((long) i, dataSplits);
+ }
+
+ final ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .build();
+ enumerator.start();
+
+ PendingSplitsCheckpoint state;
+ final AtomicReference<PendingSplitsCheckpoint> checkpoint = new
AtomicReference<>();
+
+ // empty plan
+ context.runInCoordinatorThread(
+ () -> checkpoint.set(enumerator.snapshotState(1L))); //
checkpoint
+ context.triggerAlCoordinatorAction();
+ state = checkpoint.getAndSet(null);
+ assertThat(state).isNotNull();
+ assertThat(state.currentSnapshotId()).isNull();
+ assertThat(state.splits()).isEmpty();
+
+ // scan first plan
+ context.triggerAllWorkerAction(); // scan next plan
+ context.triggerAlCoordinatorAction(); // processDiscoveredSplits
+ context.runInCoordinatorThread(
+ () -> checkpoint.set(enumerator.snapshotState(1L))); //
snapshotState
+ context.triggerAlCoordinatorAction();
+ state = checkpoint.getAndSet(null);
+ assertThat(state).isNotNull();
+ assertThat(state.currentSnapshotId()).isEqualTo(2L);
+
assertThat(toDataSplits(state.splits())).containsExactlyElementsOf(expectedResults.get(1L));
+
+ // assign first plan's splits
+ enumerator.handleSplitRequest(0, "test");
+ context.triggerAlCoordinatorAction();
+
+ // multiple plans happen before processDiscoveredSplits
+ context.triggerAllWorkerAction(); // scan next plan
+ context.runInCoordinatorThread(
+ () -> checkpoint.set(enumerator.snapshotState(2L))); //
snapshotState
+ context.triggerAllWorkerAction(); // scan next plan
+ context.triggerNextCoordinatorAction(); // process first discovered
splits
+ context.triggerNextCoordinatorAction(); // checkpoint
+ state = checkpoint.getAndSet(null);
+ assertThat(state).isNotNull();
+ assertThat(state.currentSnapshotId()).isEqualTo(3L);
+
assertThat(toDataSplits(state.splits())).containsExactlyElementsOf(expectedResults.get(2L));
+ }
+
+ private static List<DataSplit>
toDataSplits(Collection<FileStoreSourceSplit> splits) {
return splits.stream()
.map(FileStoreSourceSplit::split)
.map(split -> (DataSplit) split)
@@ -568,19 +640,23 @@ public class ContinuousFileSplitEnumeratorTest {
}
private static class MockScan implements StreamTableScan {
- private final Queue<Plan> results;
- public MockScan(Queue<Plan> results) {
+ private final TreeMap<Long, Plan> results;
+ private @Nullable Long nextSnapshotId;
+
+ public MockScan(TreeMap<Long, Plan> results) {
this.results = results;
+ this.nextSnapshotId = null;
}
@Override
public Plan plan() {
- Plan plan = results.poll();
- if (plan == null) {
+ Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
+ if (planEntry == null) {
throw new EndOfScanException();
}
- return plan;
+ nextSnapshotId = planEntry.getKey() + 1;
+ return planEntry.getValue();
}
@Override
@@ -590,7 +666,7 @@ public class ContinuousFileSplitEnumeratorTest {
@Override
public Long checkpoint() {
- return null;
+ return nextSnapshotId;
}
@Override
@@ -605,4 +681,44 @@ public class ContinuousFileSplitEnumeratorTest {
@Override
public void restore(Long state) {}
}
+
+ private static class TestingAsyncSplitEnumeratorContext<SplitT extends
SourceSplit>
+ extends TestingSplitEnumeratorContext<SplitT> {
+
+ private final ManuallyTriggeredScheduledExecutorService workerExecutor;
+ private final ExecutorNotifier notifier;
+
+ public TestingAsyncSplitEnumeratorContext(int parallelism) {
+ super(parallelism);
+ this.workerExecutor = new
ManuallyTriggeredScheduledExecutorService();
+ this.notifier = new ExecutorNotifier(workerExecutor,
super.getExecutorService());
+ }
+
+ @Override
+ public <T> void callAsync(Callable<T> callable, BiConsumer<T,
Throwable> handler) {
+ notifier.notifyReadyAsync(callable, handler);
+ }
+
+ @Override
+ public <T> void callAsync(
+ Callable<T> callable,
+ BiConsumer<T, Throwable> handler,
+ long initialDelay,
+ long period) {
+ notifier.notifyReadyAsync(callable, handler, initialDelay, period);
+ }
+
+ public void triggerAllWorkerAction() {
+ this.workerExecutor.triggerPeriodicScheduledTasks();
+ this.workerExecutor.triggerAll();
+ }
+
+ public void triggerAlCoordinatorAction() {
+ super.triggerAllActions();
+ }
+
+ public void triggerNextCoordinatorAction() {
+ super.getExecutorService().trigger();
+ }
+ }
}