This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 744ea92bfb [flink] Writer operator support SlotSharingGroup settings
(#5698)
744ea92bfb is described below
commit 744ea92bfb848a6b968abb58ad506d4389becf86
Author: yuzelin <[email protected]>
AuthorDate: Sat Jun 7 09:58:11 2025 +0800
[flink] Writer operator support SlotSharingGroup settings (#5698)
---
.../generated/flink_connector_configuration.html | 12 ++++++++++++
.../paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java | 11 +++++++++--
.../flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 6 ++++++
.../paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java | 2 ++
.../org/apache/paimon/flink/FlinkConnectorOptions.java | 12 ++++++++++++
.../main/java/org/apache/paimon/flink/sink/FlinkSink.java | 15 ++++++++++-----
6 files changed, 51 insertions(+), 7 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index b73ffc9dff..8b80512fc4 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -284,6 +284,18 @@ under the License.
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree;
otherwise, it will create an independent memory allocator.</td>
</tr>
+ <tr>
+ <td><h5>sink.writer-cpu</h5></td>
+ <td style="word-wrap: break-word;">1.0</td>
+ <td>Double</td>
+ <td>Sink writer cpu to control cpu cores of writer.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.writer-memory</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>Sink writer memory to control heap memory of writer.</td>
+ </tr>
<tr>
<td><h5>source.checkpoint-align.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index a825634a09..0cd2638179 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -50,7 +50,7 @@ import java.io.Serializable;
import java.util.Collections;
import static
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
-import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
+import static org.apache.paimon.flink.sink.FlinkSink.configureSlotSharingGroup;
import static
org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;
/**
@@ -64,6 +64,8 @@ public class FlinkCdcMultiTableSink implements Serializable {
private final boolean isOverwrite = false;
private final CatalogLoader catalogLoader;
+ private final double writeCpuCores;
+ private final MemorySize writeHeapMemory;
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final String commitUser;
@@ -72,12 +74,16 @@ public class FlinkCdcMultiTableSink implements Serializable
{
public FlinkCdcMultiTableSink(
CatalogLoader catalogLoader,
+ double writeCpuCores,
+ @Nullable MemorySize writeHeapMemory,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
String commitUser,
boolean eagerInit,
TableFilter tableFilter) {
this.catalogLoader = catalogLoader;
+ this.writeCpuCores = writeCpuCores;
+ this.writeHeapMemory = writeHeapMemory;
this.commitCpuCores = commitCpuCores;
this.commitHeapMemory = commitHeapMemory;
this.commitUser = commitUser;
@@ -120,6 +126,7 @@ public class FlinkCdcMultiTableSink implements Serializable
{
input.transform(
WRITER_NAME, typeInfo,
createWriteOperator(sinkProvider, commitUser));
forwardParallelism(written, input);
+ configureSlotSharingGroup(written, writeCpuCores, writeHeapMemory);
// shuffle committables by table
DataStream<MultiTableCommittable> partitioned =
@@ -139,7 +146,7 @@ public class FlinkCdcMultiTableSink implements Serializable
{
createCommitterFactory(tableFilter),
createCommittableStateManager()));
forwardParallelism(committed, input);
- configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
+ configureSlotSharingGroup(committed, commitCpuCores, commitHeapMemory);
return committed.sinkTo(new
DiscardingSink<>()).name("end").setParallelism(1);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index b53ec9c80f..d5c33da412 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -67,6 +67,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
private List<FileStoreTable> tables = new ArrayList<>();
@Nullable private Integer parallelism;
+ private double writerCpu;
+ @Nullable private MemorySize writerMemory;
private double committerCpu;
@Nullable private MemorySize committerMemory;
@@ -107,6 +109,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options
options) {
this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
+ this.writerCpu = options.get(FlinkConnectorOptions.SINK_WRITER_CPU);
+ this.writerMemory =
options.get(FlinkConnectorOptions.SINK_WRITER_MEMORY);
this.committerCpu =
options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
this.committerMemory =
options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
this.commitUser = createCommitUser(options);
@@ -190,6 +194,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(
catalogLoader,
+ writerCpu,
+ writerMemory,
committerCpu,
committerMemory,
commitUser,
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
index 2371fbfb9d..8ed54fac67 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
@@ -50,6 +50,8 @@ public class FlinkCdcMultiTableSinkTest {
FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(
() -> FlinkCatalogFactory.createPaimonCatalog(new
Options()),
+ FlinkConnectorOptions.SINK_WRITER_CPU.defaultValue(),
+ null,
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
null,
UUID.randomUUID().toString(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index b39e9e57e3..9c0e9d6191 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -342,6 +342,18 @@ public class FlinkConnectorOptions {
.withDescription(
"If true, a tag will be automatically created for
the snapshot created by flink savepoint.");
+ public static final ConfigOption<Double> SINK_WRITER_CPU =
+ ConfigOptions.key("sink.writer-cpu")
+ .doubleType()
+ .defaultValue(1.0)
+ .withDescription("Sink writer cpu to control cpu cores of
writer.");
+
+ public static final ConfigOption<MemorySize> SINK_WRITER_MEMORY =
+ ConfigOptions.key("sink.writer-memory")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription("Sink writer memory to control heap
memory of writer.");
+
public static final ConfigOption<Double> SINK_COMMITTER_CPU =
ConfigOptions.key("sink.committer-cpu")
.doubleType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index ccceaa1a54..f518f0c8d1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -67,6 +67,8 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERA
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_CPU;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static
org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;
@@ -242,6 +244,9 @@ public abstract class FlinkSink<T> implements Serializable {
declareManagedMemory(written,
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
+ configureSlotSharingGroup(
+ written, options.get(SINK_WRITER_CPU),
options.get(SINK_WRITER_MEMORY));
+
if (!table.primaryKeys().isEmpty() && options.get(PRECOMMIT_COMPACT)) {
SingleOutputStreamOperator<Committable> newWritten =
written.transform(
@@ -318,13 +323,13 @@ public abstract class FlinkSink<T> implements
Serializable {
if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) {
committed = committed.startNewChain();
}
- configureGlobalCommitter(
+ configureSlotSharingGroup(
committed, options.get(SINK_COMMITTER_CPU),
options.get(SINK_COMMITTER_MEMORY));
return committed.sinkTo(new
DiscardingSink<>()).name("end").setParallelism(1);
}
- public static void configureGlobalCommitter(
- SingleOutputStreamOperator<?> committed,
+ public static void configureSlotSharingGroup(
+ SingleOutputStreamOperator<?> operator,
double cpuCores,
@Nullable MemorySize heapMemory) {
if (heapMemory == null) {
@@ -332,13 +337,13 @@ public abstract class FlinkSink<T> implements
Serializable {
}
SlotSharingGroup slotSharingGroup =
- SlotSharingGroup.newBuilder(committed.getName())
+ SlotSharingGroup.newBuilder(operator.getName())
.setCpuCores(cpuCores)
.setTaskHeapMemory(
new org.apache.flink.configuration.MemorySize(
heapMemory.getBytes()))
.build();
- committed.slotSharingGroup(slotSharingGroup);
+ operator.slotSharingGroup(slotSharingGroup);
}
public static void assertStreamingConfiguration(StreamExecutionEnvironment
env) {