This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e4224a1ea [flink] Introduce fine-grained options to increase Committer 
heap memory only (#1986)
e4224a1ea is described below

commit e4224a1ea89db883342962ded19368d2bf812fd6
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 13 15:34:50 2023 +0800

    [flink] Introduce fine-grained options to increase Committer heap memory 
only (#1986)
---
 docs/content/maintenance/write-performance.md      | 13 ++++++++-
 .../generated/flink_connector_configuration.html   | 12 ++++++++
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  | 28 +++++++-----------
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     | 34 ++++++++--------------
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 27 ++++++-----------
 .../apache/paimon/flink/FlinkConnectorOptions.java | 14 +++++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 33 +++++++++++++++++----
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     | 30 ++++++++++++-------
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  | 20 +++++++++++--
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   |  4 ++-
 10 files changed, 137 insertions(+), 78 deletions(-)

diff --git a/docs/content/maintenance/write-performance.md 
b/docs/content/maintenance/write-performance.md
index 7c2b711e8..59bdc9267 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -226,7 +226,7 @@ In the initialization of write, the writer of the bucket 
needs to read all histo
 here (For example, writing a large number of partitions simultaneously), you 
can use `write-manifest-cache` to cache
 the read manifest data to accelerate initialization.
 
-## Memory
+## Write Memory
 
 There are three main places in Paimon writer that takes up memory:
 
@@ -242,3 +242,14 @@ If your Flink job does not rely on state, please avoid 
using managed memory, whi
 ```shell
 taskmanager.memory.managed.size=1m
 ```
+
+## Commit Memory
+
+Committer node may use a large memory if the amount of data written to the 
table is particularly large, OOM may occur
+if the memory is too small. In this case, you need to increase the Committer 
heap memory, but you may not want to
+increase the memory of Flink's TaskManager uniformly, which may lead to a 
waste of memory.
+
+You can use fine-grained-resource-management of Flink to increase committer 
heap memory only:
+1. Configure Flink Configuration 
`cluster.fine-grained-resource-management.enabled: true`. (This is default 
after Flink 1.18)
+2. Configure Paimon Table Options: `sink.committer-memory`, for example 300 
MB, depends on your `TaskManager`.
+   (`sink.committer-cpu` is also supported)
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 455c84b62..f5bdf73ed 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -128,6 +128,18 @@ under the License.
             <td>Duration</td>
             <td>If no records flow in a partition of a stream for that amount 
of time, then that partition is considered "idle" and will not hold back the 
progress of watermarks in downstream operators.</td>
         </tr>
+        <tr>
+            <td><h5>sink.committer-cpu</h5></td>
+            <td style="word-wrap: break-word;">1.0</td>
+            <td>Double</td>
+            <td>Sink committer cpu to control cpu cores of global 
committer.</td>
+        </tr>
+        <tr>
+            <td><h5>sink.committer-memory</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Sink committer memory to control heap memory of global 
committer.</td>
+        </tr>
         <tr>
             <td><h5>sink.managed.writer-buffer-memory</h5></td>
             <td style="word-wrap: break-word;">256 mb</td>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index f44ce1b01..155b0b5ea 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -168,23 +167,16 @@ public class KafkaSyncDatabaseAction extends ActionBase {
                         new RichCdcMultiplexRecordEventParser(
                                 schemaBuilder, includingPattern, 
excludingPattern);
 
-        FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
-                new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
-                        .withInput(
-                                env.fromSource(
-                                                source,
-                                                
WatermarkStrategy.noWatermarks(),
-                                                "Kafka Source")
-                                        .flatMap(recordParser))
-                        .withParserFactory(parserFactory)
-                        .withCatalogLoader(catalogLoader())
-                        .withDatabase(database)
-                        .withMode(MultiTablesSinkMode.COMBINED);
-        String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
-        if (sinkParallelism != null) {
-            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
-        }
-        sinkBuilder.build();
+        new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
+                .withInput(
+                        env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "Kafka Source")
+                                .flatMap(recordParser))
+                .withParserFactory(parserFactory)
+                .withCatalogLoader(catalogLoader())
+                .withDatabase(database)
+                .withMode(MultiTablesSinkMode.COMBINED)
+                .withTableOptions(tableConfig)
+                .build();
     }
 
     private void validateCaseInsensitive() {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index cf35089ef..be8a9939b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
 import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
@@ -127,27 +126,18 @@ public class MongoDBSyncDatabaseAction extends ActionBase 
{
                 () ->
                         new RichCdcMultiplexRecordEventParser(
                                 schemaBuilder, includingPattern, 
excludingPattern);
-        FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
-                new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
-                        .withInput(
-                                env.fromSource(
-                                                source,
-                                                
WatermarkStrategy.noWatermarks(),
-                                                "MongoDB Source")
-                                        .flatMap(
-                                                new MongoDBRecordParser(
-                                                        caseSensitive,
-                                                        tableNameConverter,
-                                                        mongodbConfig)))
-                        .withParserFactory(parserFactory)
-                        .withCatalogLoader(catalogLoader())
-                        .withDatabase(database)
-                        .withMode(MultiTablesSinkMode.COMBINED);
-        String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
-        if (sinkParallelism != null) {
-            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
-        }
-        sinkBuilder.build();
+        new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
+                .withInput(
+                        env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "MongoDB Source")
+                                .flatMap(
+                                        new MongoDBRecordParser(
+                                                caseSensitive, 
tableNameConverter, mongodbConfig)))
+                .withParserFactory(parserFactory)
+                .withCatalogLoader(catalogLoader())
+                .withDatabase(database)
+                .withMode(MultiTablesSinkMode.COMBINED)
+                .withTableOptions(tableConfig)
+                .build();
     }
 
     private void validateCaseInsensitive() {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 66eaa6614..8fc1ad16e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -283,23 +282,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
 
         String database = this.database;
         MultiTablesSinkMode mode = this.mode;
-        FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
-                new FlinkCdcSyncDatabaseSinkBuilder<String>()
-                        .withInput(
-                                env.fromSource(
-                                        source, 
WatermarkStrategy.noWatermarks(), "MySQL Source"))
-                        .withParserFactory(parserFactory)
-                        .withDatabase(database)
-                        .withCatalogLoader(catalogLoader())
-                        .withTables(fileStoreTables)
-                        .withMode(mode);
-
-        String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
-        if (sinkParallelism != null) {
-            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
-        }
-
-        sinkBuilder.build();
+        new FlinkCdcSyncDatabaseSinkBuilder<String>()
+                .withInput(env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "MySQL Source"))
+                .withParserFactory(parserFactory)
+                .withDatabase(database)
+                .withCatalogLoader(catalogLoader())
+                .withTables(fileStoreTables)
+                .withMode(mode)
+                .withTableOptions(tableConfig)
+                .build();
     }
 
     private void validateCaseInsensitive() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 482696c59..cefd4517a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -267,6 +267,20 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "If true, a tag will be automatically created for 
the snapshot created by flink savepoint.");
 
+    public static final ConfigOption<Double> SINK_COMMITTER_CPU =
+            ConfigOptions.key("sink.committer-cpu")
+                    .doubleType()
+                    .defaultValue(1.0)
+                    .withDescription(
+                            "Sink committer cpu to control cpu cores of global 
committer.");
+
+    public static final ConfigOption<MemorySize> SINK_COMMITTER_MEMORY =
+            ConfigOptions.key("sink.committer-memory")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Sink committer memory to control heap memory of 
global committer.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 4a010a494..4701e4391 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -26,6 +26,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
@@ -39,6 +40,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.UUID;
 
@@ -46,6 +49,8 @@ import static 
org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -199,14 +204,32 @@ public abstract class FlinkSink<T> implements 
Serializable {
         }
         SingleOutputStreamOperator<?> committed =
                 written.transform(
-                                GLOBAL_COMMITTER_NAME + " : " + table.name(),
-                                new CommittableTypeInfo(),
-                                committerOperator)
-                        .setParallelism(1)
-                        .setMaxParallelism(1);
+                        GLOBAL_COMMITTER_NAME + " : " + table.name(),
+                        new CommittableTypeInfo(),
+                        committerOperator);
+        Options options = Options.fromMap(table.options());
+        configureGlobalCommitter(
+                committed, options.get(SINK_COMMITTER_CPU), 
options.get(SINK_COMMITTER_MEMORY));
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
+    public static void configureGlobalCommitter(
+            SingleOutputStreamOperator<?> committed,
+            double cpuCores,
+            @Nullable MemorySize heapMemory) {
+        committed.setParallelism(1).setMaxParallelism(1);
+        if (heapMemory != null) {
+            SlotSharingGroup slotSharingGroup =
+                    SlotSharingGroup.newBuilder(committed.getName())
+                            .setCpuCores(cpuCores)
+                            .setTaskHeapMemory(
+                                    new 
org.apache.flink.configuration.MemorySize(
+                                            heapMemory.getBytes()))
+                            .build();
+            committed.slotSharingGroup(slotSharingGroup);
+        }
+    }
+
     public static void assertStreamingConfiguration(StreamExecutionEnvironment 
env) {
         checkArgument(
                 !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index c5cd18d35..d1ee0242f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -33,6 +33,7 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -42,10 +43,13 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.UUID;
 
 import static 
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
+import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
 
 /**
  * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema 
change if necessary.
@@ -58,9 +62,16 @@ public class FlinkCdcMultiTableSink implements Serializable {
 
     private final boolean isOverwrite = false;
     private final Catalog.Loader catalogLoader;
+    private final double commitCpuCores;
+    @Nullable private final MemorySize commitHeapMemory;
 
-    public FlinkCdcMultiTableSink(Catalog.Loader catalogLoader) {
+    public FlinkCdcMultiTableSink(
+            Catalog.Loader catalogLoader,
+            double commitCpuCores,
+            @Nullable MemorySize commitHeapMemory) {
         this.catalogLoader = catalogLoader;
+        this.commitCpuCores = commitCpuCores;
+        this.commitHeapMemory = commitHeapMemory;
     }
 
     private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
@@ -103,15 +114,14 @@ public class FlinkCdcMultiTableSink implements 
Serializable {
 
         SingleOutputStreamOperator<?> committed =
                 written.transform(
-                                GLOBAL_COMMITTER_NAME,
-                                typeInfo,
-                                new CommitterOperator<>(
-                                        true,
-                                        commitUser,
-                                        createCommitterFactory(),
-                                        createCommittableStateManager()))
-                        .setParallelism(1)
-                        .setMaxParallelism(1);
+                        GLOBAL_COMMITTER_NAME,
+                        typeInfo,
+                        new CommitterOperator<>(
+                                true,
+                                commitUser,
+                                createCommitterFactory(),
+                                createCommittableStateManager()));
+        configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 552ec40ef..1d86b7ecb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -20,9 +20,12 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
@@ -36,6 +39,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -60,6 +64,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
     private List<FileStoreTable> tables = new ArrayList<>();
 
     @Nullable private Integer parallelism;
+    private double committerCpu;
+    @Nullable private MemorySize committerMemory;
+
     // Paimon catalog used to check and create tables. There will be two
     //     places where this catalog is used. 1) in processing function,
     //     it will check newly added tables and create the corresponding
@@ -86,8 +93,14 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         return this;
     }
 
-    public FlinkCdcSyncDatabaseSinkBuilder<T> withParallelism(@Nullable 
Integer parallelism) {
-        this.parallelism = parallelism;
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Map<String, 
String> options) {
+        return withTableOptions(Options.fromMap(options));
+    }
+
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options 
options) {
+        this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
+        this.committerCpu = 
options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
+        this.committerMemory = 
options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
         return this;
     }
 
@@ -148,7 +161,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
             partitioned.setParallelism(parallelism);
         }
 
-        FlinkCdcMultiTableSink sink = new 
FlinkCdcMultiTableSink(catalogLoader);
+        FlinkCdcMultiTableSink sink =
+                new FlinkCdcMultiTableSink(catalogLoader, committerCpu, 
committerMemory);
         sink.sinkFrom(new DataStream<>(input.getExecutionEnvironment(), 
partitioned));
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index c8b0a754b..cfeaf0110 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -57,6 +57,8 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
 
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
+
 /** IT cases for {@link FlinkCdcSyncDatabaseSinkBuilder}. */
 public class FlinkCdcSyncDatabaseSinkITCase extends AbstractTestBase {
 
@@ -160,7 +162,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
                 .withTables(fileStoreTables)
                 // because we have at most 3 tables and 8 slots in 
AbstractTestBase
                 // each table can only get 2 slots
-                .withParallelism(2)
+                
.withTableOptions(Collections.singletonMap(SINK_PARALLELISM.key(), "2"))
                 .withDatabase(DATABASE_NAME)
                 .withCatalogLoader(catalogLoader)
                 .build();

Reply via email to