This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f480fd9a9 [flink] fix the issue that the historical snapshot may be 
lost when Enumerator performs snapshotState. (#1562)
f480fd9a9 is described below

commit f480fd9a92818e78e043e108f77ac95ea7f2c1d8
Author: liming.1018 <[email protected]>
AuthorDate: Thu Jul 13 17:22:35 2023 +0800

    [flink] fix the issue that the historical snapshot may be lost when 
Enumerator performs snapshotState. (#1562)
---
 .../source/ContinuousFileSplitEnumerator.java      |  25 +++-
 .../source/ContinuousFileSplitEnumeratorTest.java  | 160 ++++++++++++++++++---
 2 files changed, 160 insertions(+), 25 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 14c5c39af..0c695ebff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -106,7 +106,8 @@ public class ContinuousFileSplitEnumerator
 
     @Override
     public void start() {
-        context.callAsync(scan::plan, this::processDiscoveredSplits, 0, 
discoveryInterval);
+        context.callAsync(
+                this::scanNextSnapshot, this::processDiscoveredSplits, 0, 
discoveryInterval);
     }
 
     @Override
@@ -148,7 +149,14 @@ public class ContinuousFileSplitEnumerator
 
     // ------------------------------------------------------------------------
 
-    private void processDiscoveredSplits(TableScan.Plan plan, Throwable error) 
{
+    private PlanWithNextSnapshotId scanNextSnapshot() {
+        TableScan.Plan plan = scan.plan();
+        Long nextSnapshotId = scan.checkpoint();
+        return new PlanWithNextSnapshotId(plan, nextSnapshotId);
+    }
+
+    private void processDiscoveredSplits(
+            PlanWithNextSnapshotId planWithNextSnapshotId, Throwable error) {
         if (error != null) {
             if (error instanceof EndOfScanException) {
                 // finished
@@ -161,7 +169,8 @@ public class ContinuousFileSplitEnumerator
             return;
         }
 
-        nextSnapshotId = scan.checkpoint();
+        nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
+        TableScan.Plan plan = planWithNextSnapshotId.plan;
 
         if (plan.splits().isEmpty()) {
             return;
@@ -218,4 +227,14 @@ public class ContinuousFileSplitEnumerator
             return bucket % context.currentParallelism();
         }
     }
+
+    private static class PlanWithNextSnapshotId {
+        private final TableScan.Plan plan;
+        private final Long nextSnapshotId;
+
+        public PlanWithNextSnapshotId(TableScan.Plan plan, Long 
nextSnapshotId) {
+            this.plan = plan;
+            this.nextSnapshotId = nextSnapshotId;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index f511ee2af..a35e309f0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -27,19 +27,25 @@ import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
 
+import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState;
@@ -193,7 +199,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -209,7 +215,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.add(new DataFilePlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -262,7 +268,7 @@ public class ContinuousFileSplitEnumeratorTest {
                 new TestingSplitEnumeratorContext<>(3);
         context.registerReader(0, "test-host");
 
-        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -277,7 +283,7 @@ public class ContinuousFileSplitEnumeratorTest {
         long snapshot = 0;
         List<DataSplit> splits = new ArrayList<>();
         splits.add(createDataSplit(snapshot, 1, Collections.emptyList()));
-        results.add(new DataFilePlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -289,7 +295,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         splits.clear();
         splits.add(createDataSplit(snapshot, 2, Collections.emptyList()));
-        results.add(new DataFilePlan(splits));
+        results.put(2L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -308,7 +314,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(2, "test-host");
         context.registerReader(3, "test-host");
 
-        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -325,7 +331,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 100; i++) {
             splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
         }
-        results.add(new DataFilePlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -371,7 +377,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(2, "test-host");
         context.registerReader(3, "test-host");
 
-        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -399,7 +405,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 100; i++) {
             splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
         }
-        results.add(new DataFilePlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         // trigger assign task 0 and task 1 will get their assignment
         context.triggerAllActions();
 
@@ -428,7 +434,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -445,7 +451,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.add(new DataFilePlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -469,7 +475,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -487,7 +493,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.add(new DataFilePlan(splits));
+        results.put(1L, new DataFilePlan(splits));
 
         context.registeredReaders().remove(1);
         // assign to task 0
@@ -495,7 +501,73 @@ public class ContinuousFileSplitEnumeratorTest {
                 .doesNotThrowAnyException();
     }
 
-    private static List<DataSplit> toDataSplits(List<FileStoreSourceSplit> 
splits) {
+    @Test
+    public void testEnumeratorWithCheckpoint() {
+        final TestingAsyncSplitEnumeratorContext<FileStoreSourceSplit> context 
=
+                new TestingAsyncSplitEnumeratorContext<>(1);
+        context.registerReader(0, "test-host");
+
+        // prepare test data
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        Map<Long, List<DataSplit>> expectedResults = new HashMap<>(4);
+        StreamTableScan scan = new MockScan(results);
+        for (int i = 1; i <= 4; i++) {
+            List<DataSplit> dataSplits =
+                    Collections.singletonList(createDataSplit(i, 0, 
Collections.emptyList()));
+            results.put((long) i, new DataFilePlan(dataSplits));
+            expectedResults.put((long) i, dataSplits);
+        }
+
+        final ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .build();
+        enumerator.start();
+
+        PendingSplitsCheckpoint state;
+        final AtomicReference<PendingSplitsCheckpoint> checkpoint = new 
AtomicReference<>();
+
+        // empty plan
+        context.runInCoordinatorThread(
+                () -> checkpoint.set(enumerator.snapshotState(1L))); // 
checkpoint
+        context.triggerAlCoordinatorAction();
+        state = checkpoint.getAndSet(null);
+        assertThat(state).isNotNull();
+        assertThat(state.currentSnapshotId()).isNull();
+        assertThat(state.splits()).isEmpty();
+
+        // scan first plan
+        context.triggerAllWorkerAction(); // scan next plan
+        context.triggerAlCoordinatorAction(); // processDiscoveredSplits
+        context.runInCoordinatorThread(
+                () -> checkpoint.set(enumerator.snapshotState(1L))); // 
snapshotState
+        context.triggerAlCoordinatorAction();
+        state = checkpoint.getAndSet(null);
+        assertThat(state).isNotNull();
+        assertThat(state.currentSnapshotId()).isEqualTo(2L);
+        
assertThat(toDataSplits(state.splits())).containsExactlyElementsOf(expectedResults.get(1L));
+
+        // assign first plan's splits
+        enumerator.handleSplitRequest(0, "test");
+        context.triggerAlCoordinatorAction();
+
+        // multiple plans happen before processDiscoveredSplits
+        context.triggerAllWorkerAction(); // scan next plan
+        context.runInCoordinatorThread(
+                () -> checkpoint.set(enumerator.snapshotState(2L))); // 
snapshotState
+        context.triggerAllWorkerAction(); // scan next plan
+        context.triggerNextCoordinatorAction(); // process first discovered 
splits
+        context.triggerNextCoordinatorAction(); // checkpoint
+        state = checkpoint.getAndSet(null);
+        assertThat(state).isNotNull();
+        assertThat(state.currentSnapshotId()).isEqualTo(3L);
+        
assertThat(toDataSplits(state.splits())).containsExactlyElementsOf(expectedResults.get(2L));
+    }
+
+    private static List<DataSplit> 
toDataSplits(Collection<FileStoreSourceSplit> splits) {
         return splits.stream()
                 .map(FileStoreSourceSplit::split)
                 .map(split -> (DataSplit) split)
@@ -568,19 +640,23 @@ public class ContinuousFileSplitEnumeratorTest {
     }
 
     private static class MockScan implements StreamTableScan {
-        private final Queue<Plan> results;
 
-        public MockScan(Queue<Plan> results) {
+        private final TreeMap<Long, Plan> results;
+        private @Nullable Long nextSnapshotId;
+
+        public MockScan(TreeMap<Long, Plan> results) {
             this.results = results;
+            this.nextSnapshotId = null;
         }
 
         @Override
         public Plan plan() {
-            Plan plan = results.poll();
-            if (plan == null) {
+            Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
+            if (planEntry == null) {
                 throw new EndOfScanException();
             }
-            return plan;
+            nextSnapshotId = planEntry.getKey() + 1;
+            return planEntry.getValue();
         }
 
         @Override
@@ -590,7 +666,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         @Override
         public Long checkpoint() {
-            return null;
+            return nextSnapshotId;
         }
 
         @Override
@@ -605,4 +681,44 @@ public class ContinuousFileSplitEnumeratorTest {
         @Override
         public void restore(Long state) {}
     }
+
+    private static class TestingAsyncSplitEnumeratorContext<SplitT extends 
SourceSplit>
+            extends TestingSplitEnumeratorContext<SplitT> {
+
+        private final ManuallyTriggeredScheduledExecutorService workerExecutor;
+        private final ExecutorNotifier notifier;
+
+        public TestingAsyncSplitEnumeratorContext(int parallelism) {
+            super(parallelism);
+            this.workerExecutor = new 
ManuallyTriggeredScheduledExecutorService();
+            this.notifier = new ExecutorNotifier(workerExecutor, 
super.getExecutorService());
+        }
+
+        @Override
+        public <T> void callAsync(Callable<T> callable, BiConsumer<T, 
Throwable> handler) {
+            notifier.notifyReadyAsync(callable, handler);
+        }
+
+        @Override
+        public <T> void callAsync(
+                Callable<T> callable,
+                BiConsumer<T, Throwable> handler,
+                long initialDelay,
+                long period) {
+            notifier.notifyReadyAsync(callable, handler, initialDelay, period);
+        }
+
+        public void triggerAllWorkerAction() {
+            this.workerExecutor.triggerPeriodicScheduledTasks();
+            this.workerExecutor.triggerAll();
+        }
+
+        public void triggerAlCoordinatorAction() {
+            super.triggerAllActions();
+        }
+
+        public void triggerNextCoordinatorAction() {
+            super.getExecutorService().trigger();
+        }
+    }
 }

Reply via email to