This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c8e6cb7eb [flink] Set default parallelism for sink in cases where AQE 
not supported (#4221)
c8e6cb7eb is described below

commit c8e6cb7eb92fc65b023e41d6f7a4f8f25bb39e7d
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Tue Sep 24 11:57:23 2024 +0800

    [flink] Set default parallelism for sink in cases where AQE not supported 
(#4221)
---
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 30 -------------
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 51 +++++++++++++++++++++-
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 26 ++++-------
 3 files changed, 59 insertions(+), 48 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f810c464b..e483e3c19 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -24,7 +24,6 @@ import org.apache.paimon.CoreOptions.TagCreationMode;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SerializableRunnable;
@@ -48,10 +47,8 @@ import 
org.apache.flink.table.api.config.ExecutionConfigOptions;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 
@@ -225,13 +222,6 @@ public abstract class FlinkSink<T> implements Serializable 
{
                                         commitUser))
                         .setParallelism(parallelism == null ? 
input.getParallelism() : parallelism);
 
-        boolean writeMCacheEnabled = 
table.coreOptions().writeManifestCache().getBytes() > 0;
-        boolean hashDynamicMode = table.bucketMode() == 
BucketMode.HASH_DYNAMIC;
-        if (!isStreaming && (writeMCacheEnabled || hashDynamicMode)) {
-            assertBatchAdaptiveParallelism(
-                    env, written.getParallelism(), writeMCacheEnabled, 
hashDynamicMode);
-        }
-
         Options options = Options.fromMap(table.options());
         if (options.get(SINK_USE_MANAGED_MEMORY)) {
             declareManagedMemory(written, 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
@@ -328,26 +318,6 @@ public abstract class FlinkSink<T> implements Serializable 
{
         assertBatchAdaptiveParallelism(env, sinkParallelism, msg);
     }
 
-    public static void assertBatchAdaptiveParallelism(
-            StreamExecutionEnvironment env,
-            int sinkParallelism,
-            boolean writeMCacheEnabled,
-            boolean hashDynamicMode) {
-        List<String> messages = new ArrayList<>();
-        if (writeMCacheEnabled) {
-            messages.add("Write Manifest Cache");
-        }
-        if (hashDynamicMode) {
-            messages.add("Dynamic Bucket Mode");
-        }
-        String msg =
-                String.format(
-                        "Paimon Sink with %s does not support Flink's Adaptive 
Parallelism mode. "
-                                + "Please manually turn it off or set Paimon 
`sink.parallelism` manually.",
-                        messages);
-        assertBatchAdaptiveParallelism(env, sinkParallelism, msg);
-    }
-
     public static void assertBatchAdaptiveParallelism(
             StreamExecutionEnvironment env, int sinkParallelism, String 
exceptionMsg) {
         try {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 546f82ec1..fb9cb1959 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -44,12 +44,14 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
@@ -75,7 +77,7 @@ public class FlinkSinkBuilder {
 
     private DataStream<RowData> input;
     @Nullable protected Map<String, String> overwritePartition;
-    @Nullable protected Integer parallelism;
+    @Nullable private Integer parallelism;
     @Nullable private TableSortInfo tableSortInfo;
 
     // ============== for extension ==============
@@ -208,6 +210,7 @@ public class FlinkSinkBuilder {
 
     /** Build {@link DataStreamSink}. */
     public DataStreamSink<?> build() {
+        parallelism = checkAndUpdateParallelism(input, parallelism);
         input = trySortInput(input);
         DataStream<InternalRow> input = mapToInternalRow(this.input, 
table.rowType());
         if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
@@ -282,4 +285,50 @@ public class FlinkSinkBuilder {
         }
         return input;
     }
+
+    private Integer checkAndUpdateParallelism(DataStream<?> input, Integer 
parallelism) {
+        try {
+            boolean parallelismUndefined = parallelism == null || parallelism 
== -1;
+            boolean isStreaming = FlinkSink.isStreaming(input);
+            boolean isAdaptiveParallelismEnabled =
+                    
AdaptiveParallelism.isEnabled(input.getExecutionEnvironment());
+            boolean writeMCacheEnabled = 
table.coreOptions().writeManifestCache().getBytes() > 0;
+            boolean hashDynamicMode = table.bucketMode() == 
BucketMode.HASH_DYNAMIC;
+            if (parallelismUndefined
+                    && !isStreaming
+                    && isAdaptiveParallelismEnabled
+                    && (writeMCacheEnabled || hashDynamicMode)) {
+                List<String> messages = new ArrayList<>();
+                if (writeMCacheEnabled) {
+                    messages.add("Write Manifest Cache");
+                }
+                if (hashDynamicMode) {
+                    messages.add("Dynamic Bucket Mode");
+                }
+
+                String parallelismSource;
+                if (input.getParallelism() > 0) {
+                    parallelismSource = "input parallelism";
+                    parallelism = input.getParallelism();
+                } else {
+                    parallelismSource = DEFAULT_PARALLELISM.key();
+                    parallelism =
+                            input.getExecutionEnvironment()
+                                    .getConfiguration()
+                                    .get(DEFAULT_PARALLELISM);
+                }
+                String msg =
+                        String.format(
+                                "Paimon Sink with %s does not support Flink's 
Adaptive Parallelism mode. "
+                                        + "Configuring sink parallelism to 
`%s` instead. You can also set Paimon "
+                                        + "`sink.parallelism` manually to 
override this configuration.",
+                                messages, parallelismSource);
+                LOG.warn(msg);
+            }
+            return parallelism;
+        } catch (NoClassDefFoundError ignored) {
+            // before 1.17, there is no adaptive parallelism
+            return parallelism;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index eea8e3a3c..c60443a12 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -53,30 +53,22 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
     public void testAQEWithWriteManifest() {
         batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')");
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
-        assertThatThrownBy(() -> batchSql("INSERT INTO T SELECT a, b, c FROM T 
GROUP BY a,b,c"))
-                .hasMessageContaining(
-                        "Paimon Sink with [Write Manifest Cache] does not 
support Flink's Adaptive Parallelism mode.");
-
-        // work fine
-        batchSql(
-                "INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT 
a, b, c FROM T GROUP BY a,b,c");
-
-        // work fine too
-        batchSql("ALTER TABLE T SET ('write-manifest-cache' = '0 b')");
         batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c");
+        assertThat(batchSql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222));
     }
 
     @Test
     public void testAQEWithDynamicBucket() {
         batchSql("CREATE TABLE IF NOT EXISTS D_T (a INT PRIMARY KEY NOT 
ENFORCED, b INT, c INT)");
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
-        assertThatThrownBy(() -> batchSql("INSERT INTO D_T SELECT a, b, c FROM 
T GROUP BY a,b,c"))
-                .hasMessageContaining(
-                        "Paimon Sink with [Dynamic Bucket Mode] does not 
support Flink's Adaptive Parallelism mode.");
-
-        // work fine
-        batchSql(
-                "INSERT INTO D_T /*+ OPTIONS('sink.parallelism'='1') */ SELECT 
a, b, c FROM T GROUP BY a,b,c");
+        batchSql("INSERT INTO D_T SELECT a, b, c FROM T GROUP BY a,b,c");
+        assertThat(batchSql("SELECT * FROM D_T"))
+                .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
     }
 
     @Test

Reply via email to