This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 54c7ddd76 [flink] Support Flink adaptive parallelism by default (#3937)
54c7ddd76 is described below

commit 54c7ddd7669091b11fe3cd4f541e72eb8d3b15dd
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 13:08:45 2024 +0800

    [flink] Support Flink adaptive parallelism by default (#3937)
---
 .../org/apache/paimon/flink/sink/CombinedTableCompactorSink.java    | 6 +++---
 .../src/main/java/org/apache/paimon/flink/sink/FlinkSink.java       | 6 +++---
 .../src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java | 5 +++++
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index ad3d57771..79a1e6a9f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -44,7 +44,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
-import static org.apache.paimon.flink.sink.FlinkSink.assertBatchConfiguration;
+import static 
org.apache.paimon.flink.sink.FlinkSink.assertBatchAdaptiveParallelism;
 import static 
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
 import static 
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
 
@@ -117,8 +117,8 @@ public class CombinedTableCompactorSink implements 
Serializable {
                         
.setParallelism(unawareBucketTableSource.getParallelism());
 
         if (!isStreaming) {
-            assertBatchConfiguration(env, 
multiBucketTableRewriter.getParallelism());
-            assertBatchConfiguration(env, 
unawareBucketTableRewriter.getParallelism());
+            assertBatchAdaptiveParallelism(env, 
multiBucketTableRewriter.getParallelism());
+            assertBatchAdaptiveParallelism(env, 
unawareBucketTableRewriter.getParallelism());
         }
 
         if (options.get(SINK_USE_MANAGED_MEMORY)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 865b2a939..cfb8353ab 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -222,8 +222,8 @@ public abstract class FlinkSink<T> implements Serializable {
                                         commitUser))
                         .setParallelism(parallelism == null ? 
input.getParallelism() : parallelism);
 
-        if (!isStreaming) {
-            assertBatchConfiguration(env, written.getParallelism());
+        if (!isStreaming && 
table.coreOptions().writeManifestCache().getBytes() > 0) {
+            assertBatchAdaptiveParallelism(env, written.getParallelism());
         }
 
         Options options = Options.fromMap(table.options());
@@ -314,7 +314,7 @@ public abstract class FlinkSink<T> implements Serializable {
                         + " to exactly-once");
     }
 
-    public static void assertBatchConfiguration(
+    public static void assertBatchAdaptiveParallelism(
             StreamExecutionEnvironment env, int sinkParallelism) {
         try {
             checkArgument(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index bbc827b24..f747a7fb6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -51,6 +51,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
 
     @Test
     public void testAdaptiveParallelism() {
+        batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')");
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
         assertThatThrownBy(() -> batchSql("INSERT INTO T SELECT a, b, c FROM T 
GROUP BY a,b,c"))
                 .hasMessageContaining(
@@ -59,6 +60,10 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         // work fine
         batchSql(
                 "INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT 
a, b, c FROM T GROUP BY a,b,c");
+
+        // work fine too
+        batchSql("ALTER TABLE T SET ('write-manifest-cache' = '0 b')");
+        batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c");
     }
 
     @Test

Reply via email to