This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f17331a00 [flink] Prohibit adaptive parallelism (#1465)
f17331a00 is described below

commit f17331a009ea4779d45d6d6fcaed9d4aae26756f
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 30 16:21:07 2023 +0800

    [flink] Prohibit adaptive parallelism (#1465)
---
 .../paimon/flink/sink/AdaptiveParallelism.java     | 29 ++++++++++++++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 34 +++++++++++++++-------
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 12 ++++++++
 3 files changed, 65 insertions(+), 10 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
new file mode 100644
index 000000000..0b35f5f44
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** Get adaptive config from Flink. Only work for Flink 1.17+. */
+public class AdaptiveParallelism {
+
+    public static boolean isEnabled(StreamExecutionEnvironment env) {
+        return 
env.getConfiguration().get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 292661f1d..94f7cf63b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -24,7 +24,6 @@ import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -49,6 +48,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_F
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Abstract sink of paimon. */
 public abstract class FlinkSink<T> implements Serializable {
@@ -139,8 +139,9 @@ public abstract class FlinkSink<T> implements Serializable {
 
     public SingleOutputStreamOperator<Committable> doWrite(
             DataStream<T> input, String commitUser, Integer parallelism) {
+        StreamExecutionEnvironment env = input.getExecutionEnvironment();
         boolean isStreaming =
-                
StreamExecutionEnvironmentUtils.getConfiguration(input.getExecutionEnvironment())
+                StreamExecutionEnvironmentUtils.getConfiguration(env)
                                 .get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
 
@@ -149,12 +150,14 @@ public abstract class FlinkSink<T> implements 
Serializable {
                                 WRITER_NAME + " -> " + table.name(),
                                 new CommittableTypeInfo(),
                                 createWriteOperator(
-                                        createWriteProvider(
-                                                input.getExecutionEnvironment()
-                                                        .getCheckpointConfig(),
-                                                isStreaming),
+                                        
createWriteProvider(env.getCheckpointConfig(), isStreaming),
                                         commitUser))
                         .setParallelism(parallelism == null ? 
input.getParallelism() : parallelism);
+
+        if (!isStreaming) {
+            assertBatchConfiguration(env, written.getParallelism());
+        }
+
         Options options = Options.fromMap(table.options());
         if (options.get(SINK_USE_MANAGED_MEMORY)) {
             MemorySize memorySize = 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
@@ -174,7 +177,7 @@ public abstract class FlinkSink<T> implements Serializable {
         boolean streamingCheckpointEnabled =
                 isStreaming && checkpointConfig.isCheckpointingEnabled();
         if (streamingCheckpointEnabled) {
-            assertCheckpointConfiguration(env);
+            assertStreamingConfiguration(env);
         }
 
         SingleOutputStreamOperator<?> committed =
@@ -191,19 +194,30 @@ public abstract class FlinkSink<T> implements 
Serializable {
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
-    private void assertCheckpointConfiguration(StreamExecutionEnvironment env) 
{
-        Preconditions.checkArgument(
+    private void assertStreamingConfiguration(StreamExecutionEnvironment env) {
+        checkArgument(
                 !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
                 "Paimon sink currently does not support unaligned checkpoints. 
Please set "
                         + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
                         + " to false.");
-        Preconditions.checkArgument(
+        checkArgument(
                 env.getCheckpointConfig().getCheckpointingMode() == 
CheckpointingMode.EXACTLY_ONCE,
                 "Paimon sink currently only supports EXACTLY_ONCE checkpoint 
mode. Please set "
                         + 
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
                         + " to exactly-once");
     }
 
+    private void assertBatchConfiguration(StreamExecutionEnvironment env, int 
sinkParallelism) {
+        try {
+            checkArgument(
+                    sinkParallelism != -1 || 
!AdaptiveParallelism.isEnabled(env),
+                    "Paimon Sink does not support Flink's Adaptive Parallelism 
mode. "
+                            + "Please manually turn it off or set Paimon 
`sink.parallelism` manually.");
+        } catch (NoClassDefFoundError ignored) {
+            // before 1.17, there is no adaptive parallelism
+        }
+    }
+
     protected abstract OneInputStreamOperator<T, Committable> 
createWriteOperator(
             StoreSinkWrite.Provider writeProvider, String commitUser);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 928717d3c..b908e275a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -37,6 +37,18 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, 
b INT, c INT)");
     }
 
+    @Test
+    public void testAdaptiveParallelism() {
+        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
+        assertThatThrownBy(() -> batchSql("INSERT INTO T SELECT a, b, c FROM T 
GROUP BY a,b,c"))
+                .hasMessageContaining(
+                        "Paimon Sink does not support Flink's Adaptive 
Parallelism mode.");
+
+        // work fine
+        batchSql(
+                "INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT 
a, b, c FROM T GROUP BY a,b,c");
+    }
+
     @Test
     public void testOverwriteEmpty() {
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");

Reply via email to