This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 037dc54bb [core] Remove bounded check for append compact topology
(#3948)
037dc54bb is described below
commit 037dc54bbbc3f9553c26cddae5b4e00231fa8198
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 17:39:53 2024 +0800
[core] Remove bounded check for append compact topology (#3948)
---
.../paimon/flink/sink/cdc/CdcUnawareBucketSink.java | 2 +-
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 21 ++-------------------
.../paimon/flink/sink/FlinkTableSinkBase.java | 1 -
.../paimon/flink/sink/RowUnawareBucketSink.java | 5 ++---
.../apache/paimon/flink/sink/UnawareBucketSink.java | 7 ++-----
5 files changed, 7 insertions(+), 29 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java
index 5cd3d6085..c60772d10 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java
@@ -29,7 +29,7 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
public class CdcUnawareBucketSink extends UnawareBucketSink<CdcRecord> {
public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
- super(table, null, null, parallelism, false);
+ super(table, null, null, parallelism);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index dd73bd559..546f82ec1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -76,7 +76,6 @@ public class FlinkSinkBuilder {
private DataStream<RowData> input;
@Nullable protected Map<String, String> overwritePartition;
@Nullable protected Integer parallelism;
- private Boolean boundedInput = null;
@Nullable private TableSortInfo tableSortInfo;
// ============== for extension ==============
@@ -131,15 +130,6 @@ public class FlinkSinkBuilder {
return this;
}
- /**
- * Set input bounded, if it is bounded, append table sink does not
generate a topology for
- * merging small files.
- */
- public FlinkSinkBuilder inputBounded(boolean bounded) {
- this.boundedInput = bounded;
- return this;
- }
-
/** Clustering the input data if possible. */
public FlinkSinkBuilder clusteringIfPossible(
String clusteringColumns,
@@ -152,10 +142,7 @@ public class FlinkSinkBuilder {
return this;
}
checkState(input != null, "The input stream should be specified
earlier.");
- if (boundedInput == null) {
- boundedInput = !FlinkSink.isStreaming(input);
- }
- if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) {
+ if (FlinkSink.isStreaming(input) ||
!table.bucketMode().equals(BUCKET_UNAWARE)) {
LOG.warn(
"Clustering is enabled; however, it has been skipped as "
+ "it only supports the bucket unaware table
without primary keys and "
@@ -282,11 +269,7 @@ public class FlinkSinkBuilder {
checkArgument(
table.primaryKeys().isEmpty(),
"Unaware bucket mode only works with append-only table for
now.");
- if (boundedInput == null) {
- boundedInput = !FlinkSink.isStreaming(input);
- }
- return new RowUnawareBucketSink(
- table, overwritePartition, logSinkFunction,
parallelism, boundedInput)
+ return new RowUnawareBucketSink(table, overwritePartition,
logSinkFunction, parallelism)
.sinkFrom(input);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index ebe1c4c51..e719b9a77 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -135,7 +135,6 @@ public abstract class FlinkTableSinkBase
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
- .inputBounded(context.isBounded())
.clusteringIfPossible(
conf.get(CLUSTERING_COLUMNS),
conf.get(CLUSTERING_STRATEGY),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
index d7b8c76cf..b670b905d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
@@ -32,9 +32,8 @@ public class RowUnawareBucketSink extends
UnawareBucketSink<InternalRow> {
FileStoreTable table,
Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
- Integer parallelism,
- boolean boundedInput) {
- super(table, overwritePartitions, logSinkFunction, parallelism,
boundedInput);
+ Integer parallelism) {
+ super(table, overwritePartitions, logSinkFunction, parallelism);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
index c4ac99906..7c79d53b6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
@@ -42,19 +42,16 @@ public abstract class UnawareBucketSink<T> extends
FlinkWriteSink<T> {
protected final LogSinkFunction logSinkFunction;
@Nullable protected final Integer parallelism;
- protected final boolean boundedInput;
public UnawareBucketSink(
FileStoreTable table,
@Nullable Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
- @Nullable Integer parallelism,
- boolean boundedInput) {
+ @Nullable Integer parallelism) {
super(table, overwritePartitions);
this.table = table;
this.logSinkFunction = logSinkFunction;
this.parallelism = parallelism;
- this.boundedInput = boundedInput;
}
@Override
@@ -69,7 +66,7 @@ public abstract class UnawareBucketSink<T> extends
FlinkWriteSink<T> {
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
// if enable compaction, we need to add compaction topology to this job
- if (enableCompaction && isStreamingMode && !boundedInput) {
+ if (enableCompaction && isStreamingMode) {
written =
written.transform(
"Compact Coordinator: " + table.name(),