This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f17331a00 [flink] Prohibit adaptive parallelism (#1465)
f17331a00 is described below
commit f17331a009ea4779d45d6d6fcaed9d4aae26756f
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 30 16:21:07 2023 +0800
[flink] Prohibit adaptive parallelism (#1465)
---
.../paimon/flink/sink/AdaptiveParallelism.java | 29 ++++++++++++++++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 34 +++++++++++++++-------
.../apache/paimon/flink/BatchFileStoreITCase.java | 12 ++++++++
3 files changed, 65 insertions(+), 10 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
new file mode 100644
index 000000000..0b35f5f44
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** Get adaptive config from Flink. Only work for Flink 1.17+. */
+public class AdaptiveParallelism {
+
+ public static boolean isEnabled(StreamExecutionEnvironment env) {
+ return
env.getConfiguration().get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 292661f1d..94f7cf63b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -24,7 +24,6 @@ import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -49,6 +48,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_F
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Abstract sink of paimon. */
public abstract class FlinkSink<T> implements Serializable {
@@ -139,8 +139,9 @@ public abstract class FlinkSink<T> implements Serializable {
public SingleOutputStreamOperator<Committable> doWrite(
DataStream<T> input, String commitUser, Integer parallelism) {
+ StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming =
-
StreamExecutionEnvironmentUtils.getConfiguration(input.getExecutionEnvironment())
+ StreamExecutionEnvironmentUtils.getConfiguration(env)
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
@@ -149,12 +150,14 @@ public abstract class FlinkSink<T> implements
Serializable {
WRITER_NAME + " -> " + table.name(),
new CommittableTypeInfo(),
createWriteOperator(
- createWriteProvider(
- input.getExecutionEnvironment()
- .getCheckpointConfig(),
- isStreaming),
+
createWriteProvider(env.getCheckpointConfig(), isStreaming),
commitUser))
.setParallelism(parallelism == null ?
input.getParallelism() : parallelism);
+
+ if (!isStreaming) {
+ assertBatchConfiguration(env, written.getParallelism());
+ }
+
Options options = Options.fromMap(table.options());
if (options.get(SINK_USE_MANAGED_MEMORY)) {
MemorySize memorySize =
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
@@ -174,7 +177,7 @@ public abstract class FlinkSink<T> implements Serializable {
boolean streamingCheckpointEnabled =
isStreaming && checkpointConfig.isCheckpointingEnabled();
if (streamingCheckpointEnabled) {
- assertCheckpointConfiguration(env);
+ assertStreamingConfiguration(env);
}
SingleOutputStreamOperator<?> committed =
@@ -191,19 +194,30 @@ public abstract class FlinkSink<T> implements
Serializable {
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
}
- private void assertCheckpointConfiguration(StreamExecutionEnvironment env)
{
- Preconditions.checkArgument(
+ private void assertStreamingConfiguration(StreamExecutionEnvironment env) {
+ checkArgument(
!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
"Paimon sink currently does not support unaligned checkpoints.
Please set "
+ ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
+ " to false.");
- Preconditions.checkArgument(
+ checkArgument(
env.getCheckpointConfig().getCheckpointingMode() ==
CheckpointingMode.EXACTLY_ONCE,
"Paimon sink currently only supports EXACTLY_ONCE checkpoint
mode. Please set "
+
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
+ " to exactly-once");
}
+ private void assertBatchConfiguration(StreamExecutionEnvironment env, int
sinkParallelism) {
+ try {
+ checkArgument(
+ sinkParallelism != -1 ||
!AdaptiveParallelism.isEnabled(env),
+ "Paimon Sink does not support Flink's Adaptive Parallelism
mode. "
+ + "Please manually turn it off or set Paimon
`sink.parallelism` manually.");
+ } catch (NoClassDefFoundError ignored) {
+ // before 1.17, there is no adaptive parallelism
+ }
+ }
+
protected abstract OneInputStreamOperator<T, Committable>
createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 928717d3c..b908e275a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -37,6 +37,18 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT,
b INT, c INT)");
}
+ @Test
+ public void testAdaptiveParallelism() {
+ batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
+ assertThatThrownBy(() -> batchSql("INSERT INTO T SELECT a, b, c FROM T
GROUP BY a,b,c"))
+ .hasMessageContaining(
+ "Paimon Sink does not support Flink's Adaptive
Parallelism mode.");
+
+ // work fine
+ batchSql(
+ "INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT
a, b, c FROM T GROUP BY a,b,c");
+ }
+
@Test
public void testOverwriteEmpty() {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");