This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b0e238bc86 [Flink] Add option to avoid shuffle in bucket unaware
append sink (#7717)
b0e238bc86 is described below
commit b0e238bc86695bb230e8abfe68a1b2cdf091fe24
Author: Nick Del Nano <[email protected]>
AuthorDate: Thu May 7 04:12:26 2026 -0700
[Flink] Add option to avoid shuffle in bucket unaware append sink (#7717)
Bucket unaware append table [1] is a great choice for streaming events
into Paimon format for batch consumers. These types of streams can be
very high throughput like clickstream data. Currently there is shuffle
in the writer https://github.com/apache/paimon/pull/4203 and in my
production use cases (Kafka --> Paimon) this shuffles a _lot_ of data.
rebalance was added to avoid chaining, instead we can use
[startNewChain](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups).
This can avoid the deadlock issue described in that PR without shuffle.
---
.../generated/flink_connector_configuration.html | 6 ++++++
.../paimon/flink/action/cdc/SyncTableActionBase.java | 8 +++++++-
.../flink/action/cdc/kafka/KafkaSyncTableAction.java | 16 ++++++++++++++++
.../paimon/flink/sink/cdc/CdcAppendTableSink.java | 15 ++++++++++++++-
.../apache/paimon/flink/sink/cdc/CdcSinkBuilder.java | 9 +++++++++
.../flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java | 20 ++++++++++++++------
.../apache/paimon/flink/FlinkConnectorOptions.java | 12 ++++++++++++
7 files changed, 78 insertions(+), 8 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 253652cb78..359b8dba98 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -362,5 +362,11 @@ under the License.
<td>Integer</td>
<td>Defines a custom parallelism for the unaware-bucket table
compaction job. By default, if this option is not defined, the planner will
derive the parallelism for each statement individually by also considering the
global configuration.</td>
</tr>
+ <tr>
+ <td><h5>unaware-bucket.no-shuffle</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, the CDC sync pipeline will skip the network shuffle
between source and writer operators. This is only supported for bucket-unaware
(append) tables where each writer subtask independently appends data without
bucket ownership constraints. This eliminates data transfer overhead when the
source already provides suitable data distribution (e.g., Kafka
partitions).</td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index 0f984d3654..c3d2e94bca 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -163,6 +163,12 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
protected void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
+ createCdcSinkBuilder(input, parserFactory).build();
+ }
+
+ protected CdcSinkBuilder<RichCdcMultiplexRecord> createCdcSinkBuilder(
+ DataStream<RichCdcMultiplexRecord> input,
+ EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new CdcSinkBuilder<RichCdcMultiplexRecord>()
.withInput(input)
@@ -175,7 +181,7 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
}
- sinkBuilder.build();
+ return sinkBuilder;
}
private void checkConstraints() {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index bff61ab0cd..9c3f18f707 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -18,8 +18,13 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
import java.util.Map;
@@ -33,4 +38,15 @@ public class KafkaSyncTableAction extends
MessageQueueSyncTableActionBase {
Map<String, String> kafkaConfig) {
super(database, table, catalogConfig, kafkaConfig,
SyncJobHandler.SourceType.KAFKA);
}
+
+ @Override
+ protected void buildSink(
+ DataStream<RichCdcMultiplexRecord> input,
+ EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
+ boolean noShuffle =
+ Boolean.parseBoolean(
+ tableConfig.getOrDefault(
+
FlinkConnectorOptions.UNAWARE_BUCKET_NO_SHUFFLE.key(), "false"));
+ createCdcSinkBuilder(input,
parserFactory).withNoShuffle(noShuffle).build();
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
index 7a5b2a0f11..6694601bc7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import javax.annotation.Nullable;
@@ -37,10 +38,16 @@ import javax.annotation.Nullable;
public class CdcAppendTableSink extends FlinkWriteSink<CdcRecord> {
private final Integer parallelism;
+ private final boolean noShuffle;
public CdcAppendTableSink(FileStoreTable table, Integer parallelism) {
+ this(table, parallelism, false);
+ }
+
+ public CdcAppendTableSink(FileStoreTable table, Integer parallelism,
boolean noShuffle) {
super(table, null);
this.parallelism = parallelism;
+ this.noShuffle = noShuffle;
}
@Override
@@ -52,7 +59,13 @@ public class CdcAppendTableSink extends
FlinkWriteSink<CdcRecord> {
@Override
public DataStream<Committable> doWrite(
DataStream<CdcRecord> input, String initialCommitUser, @Nullable
Integer parallelism) {
- return super.doWrite(input, initialCommitUser, this.parallelism);
+ DataStream<Committable> written = super.doWrite(input,
initialCommitUser, this.parallelism);
+ if (noShuffle) {
+ // Break operator chaining between parse and write to avoid
deadlock
+ // during schema evolution retries, without introducing a network
shuffle.
+ ((SingleOutputStreamOperator<Committable>)
written).startNewChain();
+ }
+ return written;
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index e4518d3a26..3ae0deb548 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -54,6 +54,7 @@ public class CdcSinkBuilder<T> {
private TypeMapping typeMapping = null;
@Nullable private Integer parallelism;
+ private boolean noShuffle = false;
public CdcSinkBuilder<T> withInput(DataStream<T> input) {
this.input = input;
@@ -90,6 +91,11 @@ public class CdcSinkBuilder<T> {
return this;
}
+ public CdcSinkBuilder<T> withNoShuffle(boolean noShuffle) {
+ this.noShuffle = noShuffle;
+ return this;
+ }
+
public DataStreamSink<?> build() {
Preconditions.checkNotNull(input, "Input DataStream can not be null.");
Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not
be null.");
@@ -160,6 +166,9 @@ public class CdcSinkBuilder<T> {
private DataStreamSink<?> buildForUnawareBucket(DataStream<CdcRecord>
parsed) {
FileStoreTable dataTable = (FileStoreTable) table;
+ if (noShuffle) {
+ return new CdcAppendTableSink(dataTable, parallelism,
true).sinkFrom(parsed);
+ }
// rebalance it to make sure schema change work to avoid infinite loop
return new CdcAppendTableSink(dataTable,
parallelism).sinkFrom(parsed.rebalance());
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index ea02652971..c7e66452a9 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -71,36 +71,43 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
@Test
@Timeout(120)
public void testRandomCdcEvents() throws Exception {
- innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1,
false, false);
+ innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1,
false, false, false);
}
@Test
@Timeout(120)
public void testRandomCdcEventsDynamicBucket() throws Exception {
- innerTestRandomCdcEvents(-1, false, false);
+ innerTestRandomCdcEvents(-1, false, false, false);
}
@Test
@Timeout(120)
public void testRandomCdcEventsPostponeBucket() throws Exception {
- innerTestRandomCdcEvents(BucketMode.POSTPONE_BUCKET, false, false);
+ innerTestRandomCdcEvents(BucketMode.POSTPONE_BUCKET, false, false,
false);
}
@Disabled
@Test
@Timeout(120)
public void testRandomCdcEventsGlobalDynamicBucket() throws Exception {
- innerTestRandomCdcEvents(-1, true, false);
+ innerTestRandomCdcEvents(-1, true, false, false);
}
@Test
@Timeout(120)
public void testRandomCdcEventsUnawareBucket() throws Exception {
- innerTestRandomCdcEvents(-1, false, true);
+ innerTestRandomCdcEvents(-1, false, true, false);
+ }
+
+ @Test
+ @Timeout(120)
+ public void testRandomCdcEventsUnawareBucketNoShuffle() throws Exception {
+ innerTestRandomCdcEvents(-1, false, true, true);
}
private void innerTestRandomCdcEvents(
- int numBucket, boolean globalIndex, boolean unawareBucketMode)
throws Exception {
+ int numBucket, boolean globalIndex, boolean unawareBucketMode,
boolean noShuffle)
+ throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numEvents = random.nextInt(1500) + 1;
@@ -177,6 +184,7 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
.withParallelism(3)
.withIdentifier(Identifier.create(DATABASE_NAME, TABLE_NAME))
.withCatalogLoader(catalogLoader)
+ .withNoShuffle(noShuffle)
.build();
// enable failure when running jobs if needed
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 89d3e2174a..61c741fee2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -72,6 +72,18 @@ public class FlinkConnectorOptions {
+ "By default, if this option is not
defined, the planner will derive the parallelism "
+ "for each statement individually by also
considering the global configuration.");
+ public static final ConfigOption<Boolean> UNAWARE_BUCKET_NO_SHUFFLE =
+ ConfigOptions.key("unaware-bucket.no-shuffle")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, the CDC sync pipeline will skip the
network shuffle between "
+ + "source and writer operators. This is
only supported for "
+ + "bucket-unaware (append) tables where
each writer subtask "
+ + "independently appends data without
bucket ownership constraints. "
+ + "This eliminates data transfer overhead
when the source already "
+ + "provides suitable data distribution
(e.g., Kafka partitions).");
+
public static final ConfigOption<Boolean> INFER_SCAN_PARALLELISM =
ConfigOptions.key("scan.infer-parallelism")
.booleanType()