This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 737fb191a [flink] Limit max split while continuously scanning to avoid 
JobManager OOM (#2373)
737fb191a is described below

commit 737fb191ada173551533ba6bbf7ecf2dc4b08fec
Author: YeJunHao <[email protected]>
AuthorDate: Fri Nov 24 10:41:18 2023 +0800

    [flink] Limit max split while continuously scanning to avoid JobManager OOM 
(#2373)
---
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../main/java/org/apache/paimon/CoreOptions.java   | 12 ++++++
 .../source/ContinuousFileSplitEnumerator.java      | 28 ++++++++++----
 .../flink/source/ContinuousFileStoreSource.java    |  6 ++-
 .../AlignedContinuousFileSplitEnumerator.java      | 33 +++++++++++-----
 .../align/AlignedContinuousFileStoreSource.java    |  3 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  | 45 +++++++++++++++++++++-
 .../AlignedContinuousFileSplitEnumeratorTest.java  |  2 +-
 8 files changed, 112 insertions(+), 23 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0a5b58c97..bc832c5fb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -431,6 +431,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Integer</td>
             <td>The parallelism of scanning manifest files, default value is 
the size of cpu processor. Note: Scale-up this parameter will increase memory 
usage while scanning manifest files. We can consider downsize it when we 
encounter an out of memory exception while scanning</td>
         </tr>
+        <tr>
+            <td><h5>scan.max-splits-per-task</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>Max split size should be cached for one task while scanning. 
If splits size cached in enumerator are greater than tasks size multiply by 
this value, scanner will pause scanning.</td>
+        </tr>
         <tr>
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">default</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 96e6473a2..7f5c83863 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -217,6 +217,14 @@ public class CoreOptions implements Serializable {
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription("The discovery interval of continuous 
reading.");
 
+    public static final ConfigOption<Integer> SCAN_MAX_SPLITS_PER_TASK =
+            key("scan.max-splits-per-task")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "Max split size should be cached for one task 
while scanning. "
+                                    + "If splits size cached in enumerator are 
greater than tasks size multiply by this value, scanner will pause scanning.");
+
     @Immutable
     public static final ConfigOption<MergeEngine> MERGE_ENGINE =
             key("merge-engine")
@@ -1132,6 +1140,10 @@ public class CoreOptions implements Serializable {
         return options.get(CONTINUOUS_DISCOVERY_INTERVAL);
     }
 
+    public int scanSplitMaxPerTask() {
+        return options.get(SCAN_MAX_SPLITS_PER_TASK);
+    }
+
     public int localSortMaxNumFileHandles() {
         return options.get(LOCAL_SORT_MAX_NUM_FILE_HANDLES);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index e8ca0f7ca..e15d8ddf4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -47,6 +47,7 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -57,7 +58,6 @@ public class ContinuousFileSplitEnumerator
         implements SplitEnumerator<FileStoreSourceSplit, 
PendingSplitsCheckpoint> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
-    private static final int SPLIT_MAX_NUM = 5_000;
 
     protected final SplitEnumeratorContext<FileStoreSourceSplit> context;
 
@@ -73,6 +73,8 @@ public class ContinuousFileSplitEnumerator
 
     protected final ConsumerProgressCalculator consumerProgressCalculator;
 
+    private final int splitMaxNum;
+
     @Nullable protected Long nextSnapshotId;
 
     protected boolean finished = false;
@@ -85,7 +87,8 @@ public class ContinuousFileSplitEnumerator
             @Nullable Long nextSnapshotId,
             long discoveryInterval,
             StreamTableScan scan,
-            BucketMode bucketMode) {
+            BucketMode bucketMode,
+            int splitMaxPerTask) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
         this.nextSnapshotId = nextSnapshotId;
@@ -94,6 +97,7 @@ public class ContinuousFileSplitEnumerator
         this.splitGenerator = new FileStoreSourceSplitGenerator();
         this.scan = scan;
         this.splitAssigner = createSplitAssigner(bucketMode);
+        this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
         addSplits(remainSplits);
 
         this.consumerProgressCalculator =
@@ -135,7 +139,7 @@ public class ContinuousFileSplitEnumerator
         assignSplits();
         // if current task assigned no split, we check conditions to scan one 
more time
         if (readersAwaitingSplit.contains(subtaskId)) {
-            if (stopTriggerScan || splitAssigner.remainingSplits().size() >= 
SPLIT_MAX_NUM) {
+            if (stopTriggerScan) {
                 return;
             }
             stopTriggerScan = true;
@@ -185,17 +189,21 @@ public class ContinuousFileSplitEnumerator
     // ------------------------------------------------------------------------
 
     // this need to be synchronized because scan object is not thread safe. 
handleSplitRequest and
-    // context.callAsync will invoke this.
-    protected synchronized PlanWithNextSnapshotId scanNextSnapshot() {
+    // context.callAsync will invoke this. This method runs in 
workerExecutorThreadPool in
+    // parallelism.
+    protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot() 
{
+        if (splitAssigner.remainingSplits().size() >= splitMaxNum) {
+            return Optional.empty();
+        }
         TableScan.Plan plan = scan.plan();
         Long nextSnapshotId = scan.checkpoint();
-        return new PlanWithNextSnapshotId(plan, nextSnapshotId);
+        return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
     }
 
     // this mothod could not be synchronized, because it runs in 
coordinatorThread, which will make
     // it serialize.
     protected void processDiscoveredSplits(
-            PlanWithNextSnapshotId planWithNextSnapshotId, Throwable error) {
+            Optional<PlanWithNextSnapshotId> planWithNextSnapshotIdOptional, 
Throwable error) {
         if (error != null) {
             if (error instanceof EndOfScanException) {
                 // finished
@@ -208,6 +216,10 @@ public class ContinuousFileSplitEnumerator
             return;
         }
 
+        if (!planWithNextSnapshotIdOptional.isPresent()) {
+            return;
+        }
+        PlanWithNextSnapshotId planWithNextSnapshotId = 
planWithNextSnapshotIdOptional.get();
         nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
         TableScan.Plan plan = planWithNextSnapshotId.plan;
         if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
@@ -239,7 +251,7 @@ public class ContinuousFileSplitEnumerator
                 continue;
             }
             List<FileStoreSourceSplit> splits = splitAssigner.getNext(task, 
null);
-            if (splits.size() > 0) {
+            if (!splits.isEmpty()) {
                 assignment.put(task, splits);
                 consumerProgressCalculator.updateAssignInformation(task, 
splits.get(0));
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 98c1e6e1b..9691a8df5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -88,12 +88,14 @@ public class ContinuousFileStoreSource extends FlinkSource {
             Collection<FileStoreSourceSplit> splits,
             @Nullable Long nextSnapshotId,
             StreamTableScan scan) {
+        CoreOptions coreOptions = CoreOptions.fromMap(options);
         return new ContinuousFileSplitEnumerator(
                 context,
                 splits,
                 nextSnapshotId,
-                
CoreOptions.fromMap(options).continuousDiscoveryInterval().toMillis(),
+                coreOptions.continuousDiscoveryInterval().toMillis(),
                 scan,
-                bucketMode);
+                bucketMode,
+                coreOptions.scanSplitMaxPerTask());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 62d0abae3..87ae88a79 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 
@@ -92,8 +93,16 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
             long discoveryInterval,
             StreamTableScan scan,
             BucketMode bucketMode,
-            long alignTimeout) {
-        super(context, remainSplits, nextSnapshotId, discoveryInterval, scan, 
bucketMode);
+            long alignTimeout,
+            int splitPerTaskMax) {
+        super(
+                context,
+                remainSplits,
+                nextSnapshotId,
+                discoveryInterval,
+                scan,
+                bucketMode,
+                splitPerTaskMax);
         this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
         this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
         this.nextSnapshotId = nextSnapshotId;
@@ -193,21 +202,25 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
     // ------------------------------------------------------------------------
 
     @Override
-    protected PlanWithNextSnapshotId scanNextSnapshot() {
+    protected Optional<PlanWithNextSnapshotId> scanNextSnapshot() {
         if (pendingPlans.remainingCapacity() > 0) {
-            PlanWithNextSnapshotId scannedPlan = super.scanNextSnapshot();
-            if (!(scannedPlan.plan() instanceof SnapshotNotExistPlan)) {
-                synchronized (lock) {
-                    pendingPlans.add(scannedPlan);
-                    lock.notifyAll();
+            Optional<PlanWithNextSnapshotId> scannedPlanOptional = 
super.scanNextSnapshot();
+            if (scannedPlanOptional.isPresent()) {
+                PlanWithNextSnapshotId scannedPlan = scannedPlanOptional.get();
+                if (!(scannedPlan.plan() instanceof SnapshotNotExistPlan)) {
+                    synchronized (lock) {
+                        pendingPlans.add(scannedPlan);
+                        lock.notifyAll();
+                    }
                 }
             }
         }
-        return null;
+        return Optional.empty();
     }
 
     @Override
-    protected void processDiscoveredSplits(PlanWithNextSnapshotId ignore, 
Throwable error) {
+    protected void processDiscoveredSplits(
+            Optional<PlanWithNextSnapshotId> ignore, Throwable error) {
         if (error != null) {
             if (error instanceof EndOfScanException) {
                 // finished
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 063daf1b3..48f41b008 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -90,6 +90,7 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
                 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
                 scan,
                 bucketMode,
-                
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis());
+                
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
+                options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index dfc788403..489a1d4fc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
 import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
+import org.assertj.core.api.Assertions;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
@@ -750,6 +751,48 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
         assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(2L);
     }
 
+    @Test
+    public void testEnumeratorSplitMax() throws Exception {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                getSplitEnumeratorContext(2);
+
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        StreamTableScan scan = new MockScan(results);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .withBucketMode(BucketMode.UNAWARE)
+                        .build();
+        enumerator.start();
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        for (int i = 0; i < 16; i++) {
+            splits.add(createDataSplit(snapshot++, i, 
Collections.emptyList()));
+        }
+        results.put(1L, new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        splits = new ArrayList<>();
+        for (int i = 0; i < 16; i++) {
+            splits.add(createDataSplit(snapshot++, i, 
Collections.emptyList()));
+        }
+        results.put(2L, new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        splits = new ArrayList<>();
+        for (int i = 0; i < 16; i++) {
+            splits.add(createDataSplit(snapshot++, i, 
Collections.emptyList()));
+        }
+        results.put(3L, new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(16
 * 2);
+    }
+
     private void triggerCheckpointAndComplete(
             ContinuousFileSplitEnumerator enumerator, long checkpointId) 
throws Exception {
         enumerator.snapshotState(checkpointId);
@@ -825,7 +868,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
 
         public ContinuousFileSplitEnumerator build() {
             return new ContinuousFileSplitEnumerator(
-                    context, initialSplits, null, discoveryInterval, scan, 
bucketMode);
+                    context, initialSplits, null, discoveryInterval, scan, 
bucketMode, 10);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index 76358673d..24c8754f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -246,7 +246,7 @@ public class AlignedContinuousFileSplitEnumeratorTest 
extends FileSplitEnumerato
 
         public AlignedContinuousFileSplitEnumerator build() {
             return new AlignedContinuousFileSplitEnumerator(
-                    context, initialSplits, null, discoveryInterval, scan, 
bucketMode, timeout);
+                    context, initialSplits, null, discoveryInterval, scan, 
bucketMode, timeout, 10);
         }
     }
 }

Reply via email to