This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 55b819407 [flink] Stream read snapshot slow while reading from 
history, waiting for time interval to trigger stream scan (#1704)
55b819407 is described below

commit 55b8194076b2c00f23d097efaac1935acbf11fd7
Author: YeJunHao <[email protected]>
AuthorDate: Fri Aug 4 16:37:02 2023 +0800

    [flink] Stream read snapshot slow while reading from history, waiting for 
time interval to trigger stream scan (#1704)
---
 .../source/ContinuousFileSplitEnumerator.java      |  65 ++++++---
 .../source/ContinuousFileSplitEnumeratorTest.java  | 154 ++++++++++++++++++++-
 2 files changed, 197 insertions(+), 22 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 46bc71e5c..1e2161fdb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.source.assigners.SplitAssigner;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
 
@@ -55,6 +56,7 @@ public class ContinuousFileSplitEnumerator
         implements SplitEnumerator<FileStoreSourceSplit, 
PendingSplitsCheckpoint> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
+    private static final int SPLIT_MAX_NUM = 5_000;
 
     protected final SplitEnumeratorContext<FileStoreSourceSplit> context;
 
@@ -72,6 +74,7 @@ public class ContinuousFileSplitEnumerator
 
     @Nullable protected Long nextSnapshotId;
 
+    protected boolean blockScanByRequest = false;
     protected boolean finished = false;
 
     public ContinuousFileSplitEnumerator(
@@ -120,7 +123,7 @@ public class ContinuousFileSplitEnumerator
     @Override
     public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
         readersAwaitingSplit.add(subtaskId);
-        assignSplits();
+        doHandleSplitRequest(subtaskId);
     }
 
     @Override
@@ -146,12 +149,16 @@ public class ContinuousFileSplitEnumerator
 
     // ------------------------------------------------------------------------
 
-    protected PlanWithNextSnapshotId scanNextSnapshot() {
+    // this need to be synchronized because scan object is not thread safe. 
handleSplitRequest and
+    // context.callAsync will invoke this.
+    protected synchronized PlanWithNextSnapshotId scanNextSnapshot() {
         TableScan.Plan plan = scan.plan();
         Long nextSnapshotId = scan.checkpoint();
         return new PlanWithNextSnapshotId(plan, nextSnapshotId);
     }
 
+    // this mothod could not be synchronized, because it runs in 
coordinatorThread, which will make
+    // it serialize.
     protected void processDiscoveredSplits(
             PlanWithNextSnapshotId planWithNextSnapshotId, Throwable error) {
         if (error != null) {
@@ -168,6 +175,12 @@ public class ContinuousFileSplitEnumerator
 
         nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
         TableScan.Plan plan = planWithNextSnapshotId.plan;
+        if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
+            blockScanByRequest = true;
+            return;
+        } else {
+            blockScanByRequest = false;
+        }
 
         if (plan.splits().isEmpty()) {
             return;
@@ -177,12 +190,39 @@ public class ContinuousFileSplitEnumerator
         assignSplits();
     }
 
+    private void doHandleSplitRequest(int taskId) {
+        assignSplits();
+        // if current task assigned no split, we check conditions to scan one 
more time
+        if (readersAwaitingSplit.contains(taskId)) {
+            if (!shouldInvokeScan()) {
+                return;
+            }
+            blockScanByRequest = true;
+            context.callAsync(this::scanNextSnapshot, 
this::processDiscoveredSplits);
+        }
+    }
+
     /**
      * Method should be synchronized because {@link #handleSplitRequest} and 
{@link
      * #processDiscoveredSplits} have thread conflicts.
      */
     protected synchronized void assignSplits() {
-        Map<Integer, List<FileStoreSourceSplit>> assignment = 
createAssignment();
+        // create assignment
+        Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
+        Iterator<Integer> readersAwait = readersAwaitingSplit.iterator();
+        while (readersAwait.hasNext()) {
+            Integer task = readersAwait.next();
+            if (!context.registeredReaders().containsKey(task)) {
+                readersAwait.remove();
+                continue;
+            }
+            List<FileStoreSourceSplit> splits = splitAssigner.getNext(task, 
null);
+            if (splits.size() > 0) {
+                assignment.put(task, splits);
+            }
+        }
+
+        // remove readers who fetched splits
         if (noMoreSplits()) {
             Iterator<Integer> iterator = readersAwaitingSplit.iterator();
             while (iterator.hasNext()) {
@@ -197,26 +237,13 @@ public class ContinuousFileSplitEnumerator
         context.assignSplits(new SplitsAssignment<>(assignment));
     }
 
-    private Map<Integer, List<FileStoreSourceSplit>> createAssignment() {
-        Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
-        Iterator<Integer> readersAwait = readersAwaitingSplit.iterator();
-        while (readersAwait.hasNext()) {
-            Integer task = readersAwait.next();
-            if (!context.registeredReaders().containsKey(task)) {
-                readersAwait.remove();
-                continue;
-            }
-            List<FileStoreSourceSplit> splits = splitAssigner.getNext(task, 
null);
-            if (splits.size() > 0) {
-                assignment.put(task, splits);
-            }
-        }
-        return assignment;
+    private boolean shouldInvokeScan() {
+        return !blockScanByRequest && splitAssigner.remainingSplits().size() 
<= SPLIT_MAX_NUM;
     }
 
     protected int assignTask(int bucket) {
         if (bucketMode == BucketMode.UNAWARE) {
-            // we just assign task 0 when bucket unaware
+            // we just assign task 0 when bucket unaware, because we don't 
need this.
             return 0;
         } else {
             // if not bucket unaware, we assign the bucket % parallelism, the 
same bucket data go
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index ce4acc286..fc69b2aed 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
 
@@ -200,7 +201,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(1, "test-host");
 
         TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
-        StreamTableScan scan = new MockScan(results);
+        MockScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
                         .setSplitEnumeratorContext(context)
@@ -378,7 +379,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(3, "test-host");
 
         TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
-        StreamTableScan scan = new MockScan(results);
+        MockScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
                         .setSplitEnumeratorContext(context)
@@ -390,6 +391,7 @@ public class ContinuousFileSplitEnumeratorTest {
         enumerator.start();
 
         // assign to task 0, but no assigned. add to wait list
+        scan.allowEnd(false);
         enumerator.handleSplitRequest(0, "test-host");
         Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
                 context.getSplitAssignments();
@@ -501,6 +503,143 @@ public class ContinuousFileSplitEnumeratorTest {
                 .doesNotThrowAnyException();
     }
 
+    @Test
+    public void testTriggerScanByTaskRequest() throws Exception {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(2);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        MockScan scan = new MockScan(results);
+        scan.allowEnd(false);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .build();
+        enumerator.start();
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+        }
+        results.put(1L, new DataFilePlan(splits));
+
+        // request directly
+        enumerator.handleSplitRequest(0, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0);
+        List<FileStoreSourceSplit> assignedSplits = 
assignments.get(0).getAssignedSplits();
+        
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(0));
+
+        enumerator.handleSplitRequest(1, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        assignedSplits = assignments.get(1).getAssignedSplits();
+        
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(1));
+    }
+
+    @Test
+    public void testNoTriggerWhenReadLatest() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+        context.registerReader(2, "test-host");
+        context.registerReader(3, "test-host");
+
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        MockScan scan = new MockScan(results);
+        scan.allowEnd(false);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .build();
+        enumerator.start();
+        enumerator.handleSplitRequest(0, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+        }
+        results.put(1L, new DataFilePlan(splits));
+
+        // will not trigger scan here
+        enumerator.handleSplitRequest(0, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments).isEmpty();
+
+        enumerator.handleSplitRequest(1, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).isEmpty();
+
+        // trigger all actions, we will scan anyway
+        context.triggerAllActions();
+
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        List<FileStoreSourceSplit> assignedSplits = 
assignments.get(0).getAssignedSplits();
+        
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(0));
+        assignedSplits = assignments.get(1).getAssignedSplits();
+        
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(1));
+
+        splits.clear();
+        for (int i = 2; i < 4; i++) {
+            splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+        }
+        results.put(2L, new DataFilePlan(splits));
+        // because blockScanByRequest = false, so this request will trigger 
scan
+        enumerator.handleSplitRequest(2, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        enumerator.handleSplitRequest(3, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1, 2, 3);
+        assignedSplits = assignments.get(2).getAssignedSplits();
+        
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(0));
+        assignedSplits = assignments.get(3).getAssignedSplits();
+        
assertThat(toDataSplits(assignedSplits)).containsExactly(splits.get(1));
+
+        // this will trigger scan, and then set blockScanByRequest = true
+        enumerator.handleSplitRequest(3, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        splits.clear();
+        splits.add(createDataSplit(snapshot, 7, Collections.emptyList()));
+        results.put(3L, new DataFilePlan(splits));
+
+        // this won't trigger scan, cause blockScanByRequest = true
+        enumerator.handleSplitRequest(3, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        assignments = context.getSplitAssignments();
+        assignedSplits = assignments.get(3).getAssignedSplits();
+        assertThat(toDataSplits(assignedSplits)).doesNotContain(splits.get(0));
+
+        // forcely set blockScanByRequest = false, so the split request below 
will trigger scan
+        enumerator.blockScanByRequest = false;
+        // trigger scan here
+        enumerator.handleSplitRequest(3, "test-host");
+        context.getExecutorService().triggerAllNonPeriodicTasks();
+        assignments = context.getSplitAssignments();
+        assignedSplits = assignments.get(3).getAssignedSplits();
+        // get expected split
+        assertThat(toDataSplits(assignedSplits)).contains(splits.get(0));
+    }
+
     @Test
     public void testEnumeratorWithCheckpoint() {
         final TestingAsyncSplitEnumeratorContext<FileStoreSourceSplit> context 
=
@@ -652,6 +791,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         private final TreeMap<Long, Plan> results;
         private @Nullable Long nextSnapshotId;
+        private boolean allowEnd = true;
 
         public MockScan(TreeMap<Long, Plan> results) {
             this.results = results;
@@ -662,7 +802,11 @@ public class ContinuousFileSplitEnumeratorTest {
         public Plan plan() {
             Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
             if (planEntry == null) {
-                throw new EndOfScanException();
+                if (allowEnd) {
+                    throw new EndOfScanException();
+                } else {
+                    return SnapshotNotExistPlan.INSTANCE;
+                }
             }
             nextSnapshotId = planEntry.getKey() + 1;
             return planEntry.getValue();
@@ -689,6 +833,10 @@ public class ContinuousFileSplitEnumeratorTest {
 
         @Override
         public void restore(Long state) {}
+
+        public void allowEnd(boolean allowEnd) {
+            this.allowEnd = allowEnd;
+        }
     }
 
     private static class TestingAsyncSplitEnumeratorContext<SplitT extends 
SourceSplit>

Reply via email to