This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 54c7ddd76 [flink] Support Flink adaptive parallelism by default (#3937)
54c7ddd76 is described below
commit 54c7ddd7669091b11fe3cd4f541e72eb8d3b15dd
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 13:08:45 2024 +0800
[flink] Support Flink adaptive parallelism by default (#3937)
---
.../org/apache/paimon/flink/sink/CombinedTableCompactorSink.java | 6 +++---
.../src/main/java/org/apache/paimon/flink/sink/FlinkSink.java | 6 +++---
.../src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java | 5 +++++
3 files changed, 11 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index ad3d57771..79a1e6a9f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -44,7 +44,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
-import static org.apache.paimon.flink.sink.FlinkSink.assertBatchConfiguration;
+import static
org.apache.paimon.flink.sink.FlinkSink.assertBatchAdaptiveParallelism;
import static
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
import static
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
@@ -117,8 +117,8 @@ public class CombinedTableCompactorSink implements
Serializable {
.setParallelism(unawareBucketTableSource.getParallelism());
if (!isStreaming) {
- assertBatchConfiguration(env,
multiBucketTableRewriter.getParallelism());
- assertBatchConfiguration(env,
unawareBucketTableRewriter.getParallelism());
+ assertBatchAdaptiveParallelism(env,
multiBucketTableRewriter.getParallelism());
+ assertBatchAdaptiveParallelism(env,
unawareBucketTableRewriter.getParallelism());
}
if (options.get(SINK_USE_MANAGED_MEMORY)) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 865b2a939..cfb8353ab 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -222,8 +222,8 @@ public abstract class FlinkSink<T> implements Serializable {
commitUser))
.setParallelism(parallelism == null ?
input.getParallelism() : parallelism);
- if (!isStreaming) {
- assertBatchConfiguration(env, written.getParallelism());
+ if (!isStreaming &&
table.coreOptions().writeManifestCache().getBytes() > 0) {
+ assertBatchAdaptiveParallelism(env, written.getParallelism());
}
Options options = Options.fromMap(table.options());
@@ -314,7 +314,7 @@ public abstract class FlinkSink<T> implements Serializable {
+ " to exactly-once");
}
- public static void assertBatchConfiguration(
+ public static void assertBatchAdaptiveParallelism(
StreamExecutionEnvironment env, int sinkParallelism) {
try {
checkArgument(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index bbc827b24..f747a7fb6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -51,6 +51,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
@Test
public void testAdaptiveParallelism() {
+ batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')");
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
assertThatThrownBy(() -> batchSql("INSERT INTO T SELECT a, b, c FROM T
GROUP BY a,b,c"))
.hasMessageContaining(
@@ -59,6 +60,10 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
// work fine
batchSql(
"INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT
a, b, c FROM T GROUP BY a,b,c");
+
+ // work fine too
+ batchSql("ALTER TABLE T SET ('write-manifest-cache' = '0 b')");
+ batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c");
}
@Test