This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 624a553fb60d5d35640bc89c876f66fc288bcd4a Author: YeJunHao <[email protected]> AuthorDate: Thu Dec 7 11:16:51 2023 +0800 [flink] Disable compaction topology when streaming with bounded input stream (#2463) --- .../main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 9 ++++++++- .../java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java | 1 + .../org/apache/paimon/flink/sink/UnawareBucketWriteSink.java | 8 ++++++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 56ead8a85..46a0c09fa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -44,6 +44,7 @@ public class FlinkSinkBuilder { @Nullable private Map<String, String> overwritePartition; @Nullable private LogSinkFunction logSinkFunction; @Nullable private Integer parallelism; + private boolean boundedInput = false; private boolean compactSink = false; public FlinkSinkBuilder(FileStoreTable table) { @@ -79,6 +80,11 @@ public class FlinkSinkBuilder { return this; } + public FlinkSinkBuilder withBoundedInputStream(boolean bounded) { + this.boundedInput = bounded; + return this; + } + public FlinkSinkBuilder forCompact(boolean compactSink) { this.compactSink = compactSink; return this; @@ -142,7 +148,8 @@ public class FlinkSinkBuilder { (AppendOnlyFileStoreTable) table, overwritePartition, logSinkFunction, - parallelism) + parallelism, + boundedInput) .sinkFrom(input); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index be6628bde..3874e2ca7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -133,6 +133,7 @@ public abstract class FlinkTableSinkBase .withLogSinkFunction(logSinkFunction) .withOverwritePartition(overwrite ? staticPartitions : null) .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM)) + .withBoundedInputStream(context.isBounded()) .build()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java index 856432582..802a98dcf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java @@ -40,16 +40,19 @@ public class UnawareBucketWriteSink extends FileStoreSink { private final boolean enableCompaction; private final AppendOnlyFileStoreTable table; private final Integer parallelism; + private final boolean boundedInput; public UnawareBucketWriteSink( AppendOnlyFileStoreTable table, Map<String, String> overwritePartitions, LogSinkFunction logSinkFunction, - Integer parallelism) { + Integer parallelism, + boolean boundedInput) { super(table, overwritePartitions, logSinkFunction); this.table = table; this.enableCompaction = !table.coreOptions().writeOnly(); this.parallelism = parallelism; + this.boundedInput = boundedInput; } @Override @@ -63,7 +66,8 @@ public class UnawareBucketWriteSink extends FileStoreSink { .get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; // if enable compaction, we need to add compaction topology to this job - if (enableCompaction && isStreamingMode) { + if (enableCompaction && isStreamingMode && !boundedInput) { + // if streaming mode with bounded input, we disable compaction topology UnawareBucketCompactionTopoBuilder builder = new UnawareBucketCompactionTopoBuilder( input.getExecutionEnvironment(), table.name(), table);
