This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 737fb191a [flink] Limit max split while continuously scanning to avoid
JobManager OOM (#2373)
737fb191a is described below
commit 737fb191ada173551533ba6bbf7ecf2dc4b08fec
Author: YeJunHao <[email protected]>
AuthorDate: Fri Nov 24 10:41:18 2023 +0800
[flink] Limit max split while continuously scanning to avoid JobManager OOM
(#2373)
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 12 ++++++
.../source/ContinuousFileSplitEnumerator.java | 28 ++++++++++----
.../flink/source/ContinuousFileStoreSource.java | 6 ++-
.../AlignedContinuousFileSplitEnumerator.java | 33 +++++++++++-----
.../align/AlignedContinuousFileStoreSource.java | 3 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 45 +++++++++++++++++++++-
.../AlignedContinuousFileSplitEnumeratorTest.java | 2 +-
8 files changed, 112 insertions(+), 23 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0a5b58c97..bc832c5fb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -431,6 +431,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Integer</td>
<td>The parallelism of scanning manifest files, default value is
the size of cpu processor. Note: Scale-up this parameter will increase memory
usage while scanning manifest files. We can consider downsize it when we
encounter an out of memory exception while scanning</td>
</tr>
+ <tr>
+ <td><h5>scan.max-splits-per-task</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>Max split size should be cached for one task while scanning.
If splits size cached in enumerator are greater than tasks size multiply by
this value, scanner will pause scanning.</td>
+ </tr>
<tr>
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 96e6473a2..7f5c83863 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -217,6 +217,14 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofSeconds(10))
.withDescription("The discovery interval of continuous
reading.");
+ public static final ConfigOption<Integer> SCAN_MAX_SPLITS_PER_TASK =
+ key("scan.max-splits-per-task")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "Max split size should be cached for one task
while scanning. "
+ + "If splits size cached in enumerator are
greater than tasks size multiply by this value, scanner will pause scanning.");
+
@Immutable
public static final ConfigOption<MergeEngine> MERGE_ENGINE =
key("merge-engine")
@@ -1132,6 +1140,10 @@ public class CoreOptions implements Serializable {
return options.get(CONTINUOUS_DISCOVERY_INTERVAL);
}
+ public int scanSplitMaxPerTask() {
+ return options.get(SCAN_MAX_SPLITS_PER_TASK);
+ }
+
public int localSortMaxNumFileHandles() {
return options.get(LOCAL_SORT_MAX_NUM_FILE_HANDLES);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index e8ca0f7ca..e15d8ddf4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -47,6 +47,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -57,7 +58,6 @@ public class ContinuousFileSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit,
PendingSplitsCheckpoint> {
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
- private static final int SPLIT_MAX_NUM = 5_000;
protected final SplitEnumeratorContext<FileStoreSourceSplit> context;
@@ -73,6 +73,8 @@ public class ContinuousFileSplitEnumerator
protected final ConsumerProgressCalculator consumerProgressCalculator;
+ private final int splitMaxNum;
+
@Nullable protected Long nextSnapshotId;
protected boolean finished = false;
@@ -85,7 +87,8 @@ public class ContinuousFileSplitEnumerator
@Nullable Long nextSnapshotId,
long discoveryInterval,
StreamTableScan scan,
- BucketMode bucketMode) {
+ BucketMode bucketMode,
+ int splitMaxPerTask) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
@@ -94,6 +97,7 @@ public class ContinuousFileSplitEnumerator
this.splitGenerator = new FileStoreSourceSplitGenerator();
this.scan = scan;
this.splitAssigner = createSplitAssigner(bucketMode);
+ this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
addSplits(remainSplits);
this.consumerProgressCalculator =
@@ -135,7 +139,7 @@ public class ContinuousFileSplitEnumerator
assignSplits();
// if current task assigned no split, we check conditions to scan one
more time
if (readersAwaitingSplit.contains(subtaskId)) {
- if (stopTriggerScan || splitAssigner.remainingSplits().size() >=
SPLIT_MAX_NUM) {
+ if (stopTriggerScan) {
return;
}
stopTriggerScan = true;
@@ -185,17 +189,21 @@ public class ContinuousFileSplitEnumerator
// ------------------------------------------------------------------------
// this need to be synchronized because scan object is not thread safe.
handleSplitRequest and
- // context.callAsync will invoke this.
- protected synchronized PlanWithNextSnapshotId scanNextSnapshot() {
+ // context.callAsync will invoke this. This method runs in
workerExecutorThreadPool in
+ // parallelism.
+ protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot()
{
+ if (splitAssigner.remainingSplits().size() >= splitMaxNum) {
+ return Optional.empty();
+ }
TableScan.Plan plan = scan.plan();
Long nextSnapshotId = scan.checkpoint();
- return new PlanWithNextSnapshotId(plan, nextSnapshotId);
+ return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
}
// this mothod could not be synchronized, because it runs in
coordinatorThread, which will make
// it serialize.
protected void processDiscoveredSplits(
- PlanWithNextSnapshotId planWithNextSnapshotId, Throwable error) {
+ Optional<PlanWithNextSnapshotId> planWithNextSnapshotIdOptional,
Throwable error) {
if (error != null) {
if (error instanceof EndOfScanException) {
// finished
@@ -208,6 +216,10 @@ public class ContinuousFileSplitEnumerator
return;
}
+ if (!planWithNextSnapshotIdOptional.isPresent()) {
+ return;
+ }
+ PlanWithNextSnapshotId planWithNextSnapshotId =
planWithNextSnapshotIdOptional.get();
nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
TableScan.Plan plan = planWithNextSnapshotId.plan;
if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
@@ -239,7 +251,7 @@ public class ContinuousFileSplitEnumerator
continue;
}
List<FileStoreSourceSplit> splits = splitAssigner.getNext(task,
null);
- if (splits.size() > 0) {
+ if (!splits.isEmpty()) {
assignment.put(task, splits);
consumerProgressCalculator.updateAssignInformation(task,
splits.get(0));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 98c1e6e1b..9691a8df5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -88,12 +88,14 @@ public class ContinuousFileStoreSource extends FlinkSource {
Collection<FileStoreSourceSplit> splits,
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
+ CoreOptions coreOptions = CoreOptions.fromMap(options);
return new ContinuousFileSplitEnumerator(
context,
splits,
nextSnapshotId,
-
CoreOptions.fromMap(options).continuousDiscoveryInterval().toMillis(),
+ coreOptions.continuousDiscoveryInterval().toMillis(),
scan,
- bucketMode);
+ bucketMode,
+ coreOptions.scanSplitMaxPerTask());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 62d0abae3..87ae88a79 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
@@ -92,8 +93,16 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
long discoveryInterval,
StreamTableScan scan,
BucketMode bucketMode,
- long alignTimeout) {
- super(context, remainSplits, nextSnapshotId, discoveryInterval, scan,
bucketMode);
+ long alignTimeout,
+ int splitPerTaskMax) {
+ super(
+ context,
+ remainSplits,
+ nextSnapshotId,
+ discoveryInterval,
+ scan,
+ bucketMode,
+ splitPerTaskMax);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
@@ -193,21 +202,25 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
// ------------------------------------------------------------------------
@Override
- protected PlanWithNextSnapshotId scanNextSnapshot() {
+ protected Optional<PlanWithNextSnapshotId> scanNextSnapshot() {
if (pendingPlans.remainingCapacity() > 0) {
- PlanWithNextSnapshotId scannedPlan = super.scanNextSnapshot();
- if (!(scannedPlan.plan() instanceof SnapshotNotExistPlan)) {
- synchronized (lock) {
- pendingPlans.add(scannedPlan);
- lock.notifyAll();
+ Optional<PlanWithNextSnapshotId> scannedPlanOptional =
super.scanNextSnapshot();
+ if (scannedPlanOptional.isPresent()) {
+ PlanWithNextSnapshotId scannedPlan = scannedPlanOptional.get();
+ if (!(scannedPlan.plan() instanceof SnapshotNotExistPlan)) {
+ synchronized (lock) {
+ pendingPlans.add(scannedPlan);
+ lock.notifyAll();
+ }
}
}
}
- return null;
+ return Optional.empty();
}
@Override
- protected void processDiscoveredSplits(PlanWithNextSnapshotId ignore,
Throwable error) {
+ protected void processDiscoveredSplits(
+ Optional<PlanWithNextSnapshotId> ignore, Throwable error) {
if (error != null) {
if (error instanceof EndOfScanException) {
// finished
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 063daf1b3..48f41b008 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -90,6 +90,7 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
bucketMode,
-
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis());
+
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
+ options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index dfc788403..489a1d4fc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -33,6 +33,7 @@ import
org.apache.flink.api.connector.source.SplitEnumeratorContext;
import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
+import org.assertj.core.api.Assertions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -750,6 +751,48 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(2L);
}
+ @Test
+ public void testEnumeratorSplitMax() throws Exception {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ getSplitEnumeratorContext(2);
+
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ StreamTableScan scan = new MockScan(results);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .withBucketMode(BucketMode.UNAWARE)
+ .build();
+ enumerator.start();
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ for (int i = 0; i < 16; i++) {
+ splits.add(createDataSplit(snapshot++, i,
Collections.emptyList()));
+ }
+ results.put(1L, new DataFilePlan(splits));
+ context.triggerAllActions();
+
+ splits = new ArrayList<>();
+ for (int i = 0; i < 16; i++) {
+ splits.add(createDataSplit(snapshot++, i,
Collections.emptyList()));
+ }
+ results.put(2L, new DataFilePlan(splits));
+ context.triggerAllActions();
+
+ splits = new ArrayList<>();
+ for (int i = 0; i < 16; i++) {
+ splits.add(createDataSplit(snapshot++, i,
Collections.emptyList()));
+ }
+ results.put(3L, new DataFilePlan(splits));
+ context.triggerAllActions();
+
+
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(16
* 2);
+ }
+
private void triggerCheckpointAndComplete(
ContinuousFileSplitEnumerator enumerator, long checkpointId)
throws Exception {
enumerator.snapshotState(checkpointId);
@@ -825,7 +868,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval, scan,
bucketMode);
+ context, initialSplits, null, discoveryInterval, scan,
bucketMode, 10);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index 76358673d..24c8754f5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -246,7 +246,7 @@ public class AlignedContinuousFileSplitEnumeratorTest
extends FileSplitEnumerato
public AlignedContinuousFileSplitEnumerator build() {
return new AlignedContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval, scan,
bucketMode, timeout);
+ context, initialSplits, null, discoveryInterval, scan,
bucketMode, timeout, 10);
}
}
}