This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c8e6cb7eb [flink] Set default parallelism for sink in cases where AQE
not supported (#4221)
c8e6cb7eb is described below
commit c8e6cb7eb92fc65b023e41d6f7a4f8f25bb39e7d
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Tue Sep 24 11:57:23 2024 +0800
[flink] Set default parallelism for sink in cases where AQE not supported
(#4221)
---
.../org/apache/paimon/flink/sink/FlinkSink.java | 30 -------------
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 51 +++++++++++++++++++++-
.../apache/paimon/flink/BatchFileStoreITCase.java | 26 ++++-------
3 files changed, 59 insertions(+), 48 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f810c464b..e483e3c19 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -24,7 +24,6 @@ import org.apache.paimon.CoreOptions.TagCreationMode;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableRunnable;
@@ -48,10 +47,8 @@ import
org.apache.flink.table.api.config.ExecutionConfigOptions;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import java.util.Set;
@@ -225,13 +222,6 @@ public abstract class FlinkSink<T> implements Serializable
{
commitUser))
.setParallelism(parallelism == null ?
input.getParallelism() : parallelism);
- boolean writeMCacheEnabled =
table.coreOptions().writeManifestCache().getBytes() > 0;
- boolean hashDynamicMode = table.bucketMode() ==
BucketMode.HASH_DYNAMIC;
- if (!isStreaming && (writeMCacheEnabled || hashDynamicMode)) {
- assertBatchAdaptiveParallelism(
- env, written.getParallelism(), writeMCacheEnabled,
hashDynamicMode);
- }
-
Options options = Options.fromMap(table.options());
if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written,
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
@@ -328,26 +318,6 @@ public abstract class FlinkSink<T> implements Serializable
{
assertBatchAdaptiveParallelism(env, sinkParallelism, msg);
}
- public static void assertBatchAdaptiveParallelism(
- StreamExecutionEnvironment env,
- int sinkParallelism,
- boolean writeMCacheEnabled,
- boolean hashDynamicMode) {
- List<String> messages = new ArrayList<>();
- if (writeMCacheEnabled) {
- messages.add("Write Manifest Cache");
- }
- if (hashDynamicMode) {
- messages.add("Dynamic Bucket Mode");
- }
- String msg =
- String.format(
- "Paimon Sink with %s does not support Flink's Adaptive
Parallelism mode. "
- + "Please manually turn it off or set Paimon
`sink.parallelism` manually.",
- messages);
- assertBatchAdaptiveParallelism(env, sinkParallelism, msg);
- }
-
public static void assertBatchAdaptiveParallelism(
StreamExecutionEnvironment env, int sinkParallelism, String
exceptionMsg) {
try {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 546f82ec1..fb9cb1959 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -44,12 +44,14 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
@@ -75,7 +77,7 @@ public class FlinkSinkBuilder {
private DataStream<RowData> input;
@Nullable protected Map<String, String> overwritePartition;
- @Nullable protected Integer parallelism;
+ @Nullable private Integer parallelism;
@Nullable private TableSortInfo tableSortInfo;
// ============== for extension ==============
@@ -208,6 +210,7 @@ public class FlinkSinkBuilder {
/** Build {@link DataStreamSink}. */
public DataStreamSink<?> build() {
+ parallelism = checkAndUpdateParallelism(input, parallelism);
input = trySortInput(input);
DataStream<InternalRow> input = mapToInternalRow(this.input,
table.rowType());
if (table.coreOptions().localMergeEnabled() &&
table.schema().primaryKeys().size() > 0) {
@@ -282,4 +285,50 @@ public class FlinkSinkBuilder {
}
return input;
}
+
+ private Integer checkAndUpdateParallelism(DataStream<?> input, Integer
parallelism) {
+ try {
+ boolean parallelismUndefined = parallelism == null || parallelism
== -1;
+ boolean isStreaming = FlinkSink.isStreaming(input);
+ boolean isAdaptiveParallelismEnabled =
+
AdaptiveParallelism.isEnabled(input.getExecutionEnvironment());
+ boolean writeMCacheEnabled =
table.coreOptions().writeManifestCache().getBytes() > 0;
+ boolean hashDynamicMode = table.bucketMode() ==
BucketMode.HASH_DYNAMIC;
+ if (parallelismUndefined
+ && !isStreaming
+ && isAdaptiveParallelismEnabled
+ && (writeMCacheEnabled || hashDynamicMode)) {
+ List<String> messages = new ArrayList<>();
+ if (writeMCacheEnabled) {
+ messages.add("Write Manifest Cache");
+ }
+ if (hashDynamicMode) {
+ messages.add("Dynamic Bucket Mode");
+ }
+
+ String parallelismSource;
+ if (input.getParallelism() > 0) {
+ parallelismSource = "input parallelism";
+ parallelism = input.getParallelism();
+ } else {
+ parallelismSource = DEFAULT_PARALLELISM.key();
+ parallelism =
+ input.getExecutionEnvironment()
+ .getConfiguration()
+ .get(DEFAULT_PARALLELISM);
+ }
+ String msg =
+ String.format(
+ "Paimon Sink with %s does not support Flink's
Adaptive Parallelism mode. "
+ + "Configuring sink parallelism to
`%s` instead. You can also set Paimon "
+ + "`sink.parallelism` manually to
override this configuration.",
+ messages, parallelismSource);
+ LOG.warn(msg);
+ }
+ return parallelism;
+ } catch (NoClassDefFoundError ignored) {
+ // before 1.17, there is no adaptive parallelism
+ return parallelism;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index eea8e3a3c..c60443a12 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -53,30 +53,22 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
public void testAQEWithWriteManifest() {
batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')");
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
- assertThatThrownBy(() -> batchSql("INSERT INTO T SELECT a, b, c FROM T
GROUP BY a,b,c"))
- .hasMessageContaining(
- "Paimon Sink with [Write Manifest Cache] does not
support Flink's Adaptive Parallelism mode.");
-
- // work fine
- batchSql(
- "INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT
a, b, c FROM T GROUP BY a,b,c");
-
- // work fine too
- batchSql("ALTER TABLE T SET ('write-manifest-cache' = '0 b')");
batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c");
+ assertThat(batchSql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222));
}
@Test
public void testAQEWithDynamicBucket() {
batchSql("CREATE TABLE IF NOT EXISTS D_T (a INT PRIMARY KEY NOT
ENFORCED, b INT, c INT)");
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
- assertThatThrownBy(() -> batchSql("INSERT INTO D_T SELECT a, b, c FROM
T GROUP BY a,b,c"))
- .hasMessageContaining(
- "Paimon Sink with [Dynamic Bucket Mode] does not
support Flink's Adaptive Parallelism mode.");
-
- // work fine
- batchSql(
- "INSERT INTO D_T /*+ OPTIONS('sink.parallelism'='1') */ SELECT
a, b, c FROM T GROUP BY a,b,c");
+ batchSql("INSERT INTO D_T SELECT a, b, c FROM T GROUP BY a,b,c");
+ assertThat(batchSql("SELECT * FROM D_T"))
+ .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22,
222));
}
@Test