This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e4224a1ea [flink] Introduce fine-grained options to increase Committer
heap memory only (#1986)
e4224a1ea is described below
commit e4224a1ea89db883342962ded19368d2bf812fd6
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 13 15:34:50 2023 +0800
[flink] Introduce fine-grained options to increase Committer heap memory
only (#1986)
---
docs/content/maintenance/write-performance.md | 13 ++++++++-
.../generated/flink_connector_configuration.html | 12 ++++++++
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 28 +++++++-----------
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 34 ++++++++--------------
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 27 ++++++-----------
.../apache/paimon/flink/FlinkConnectorOptions.java | 14 +++++++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 33 +++++++++++++++++----
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 30 ++++++++++++-------
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 20 +++++++++++--
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 4 ++-
10 files changed, 137 insertions(+), 78 deletions(-)
diff --git a/docs/content/maintenance/write-performance.md
b/docs/content/maintenance/write-performance.md
index 7c2b711e8..59bdc9267 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -226,7 +226,7 @@ In the initialization of write, the writer of the bucket
needs to read all histo
here (For example, writing a large number of partitions simultaneously), you
can use `write-manifest-cache` to cache
the read manifest data to accelerate initialization.
-## Memory
+## Write Memory
There are three main places in Paimon writer that takes up memory:
@@ -242,3 +242,14 @@ If your Flink job does not rely on state, please avoid
using managed memory, whi
```shell
taskmanager.memory.managed.size=1m
```
+
+## Commit Memory
+
+Committer node may use a large memory if the amount of data written to the
table is particularly large, OOM may occur
+if the memory is too small. In this case, you need to increase the Committer
heap memory, but you may not want to
+increase the memory of Flink's TaskManager uniformly, which may lead to a
waste of memory.
+
+You can use fine-grained-resource-management of Flink to increase committer
heap memory only:
+1. Configure Flink Configuration
`cluster.fine-grained-resource-management.enabled: true`. (This is default
after Flink 1.18)
+2. Configure Paimon Table Options: `sink.committer-memory`, for example 300
MB, depends on your `TaskManager`.
+ (`sink.committer-cpu` is also supported)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 455c84b62..f5bdf73ed 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -128,6 +128,18 @@ under the License.
<td>Duration</td>
<td>If no records flow in a partition of a stream for that amount
of time, then that partition is considered "idle" and will not hold back the
progress of watermarks in downstream operators.</td>
</tr>
+ <tr>
+ <td><h5>sink.committer-cpu</h5></td>
+ <td style="word-wrap: break-word;">1.0</td>
+ <td>Double</td>
+ <td>Sink committer cpu to control cpu cores of global
committer.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.committer-memory</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>Sink committer memory to control heap memory of global
committer.</td>
+ </tr>
<tr>
<td><h5>sink.managed.writer-buffer-memory</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index f44ce1b01..155b0b5ea 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -168,23 +167,16 @@ public class KafkaSyncDatabaseAction extends ActionBase {
new RichCdcMultiplexRecordEventParser(
schemaBuilder, includingPattern,
excludingPattern);
- FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
- new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
- .withInput(
- env.fromSource(
- source,
-
WatermarkStrategy.noWatermarks(),
- "Kafka Source")
- .flatMap(recordParser))
- .withParserFactory(parserFactory)
- .withCatalogLoader(catalogLoader())
- .withDatabase(database)
- .withMode(MultiTablesSinkMode.COMBINED);
- String sinkParallelism =
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
- if (sinkParallelism != null) {
- sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
- }
- sinkBuilder.build();
+ new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
+ .withInput(
+ env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source")
+ .flatMap(recordParser))
+ .withParserFactory(parserFactory)
+ .withCatalogLoader(catalogLoader())
+ .withDatabase(database)
+ .withMode(MultiTablesSinkMode.COMBINED)
+ .withTableOptions(tableConfig)
+ .build();
}
private void validateCaseInsensitive() {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index cf35089ef..be8a9939b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
@@ -127,27 +126,18 @@ public class MongoDBSyncDatabaseAction extends ActionBase
{
() ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder, includingPattern,
excludingPattern);
- FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
- new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
- .withInput(
- env.fromSource(
- source,
-
WatermarkStrategy.noWatermarks(),
- "MongoDB Source")
- .flatMap(
- new MongoDBRecordParser(
- caseSensitive,
- tableNameConverter,
- mongodbConfig)))
- .withParserFactory(parserFactory)
- .withCatalogLoader(catalogLoader())
- .withDatabase(database)
- .withMode(MultiTablesSinkMode.COMBINED);
- String sinkParallelism =
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
- if (sinkParallelism != null) {
- sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
- }
- sinkBuilder.build();
+ new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
+ .withInput(
+ env.fromSource(source,
WatermarkStrategy.noWatermarks(), "MongoDB Source")
+ .flatMap(
+ new MongoDBRecordParser(
+ caseSensitive,
tableNameConverter, mongodbConfig)))
+ .withParserFactory(parserFactory)
+ .withCatalogLoader(catalogLoader())
+ .withDatabase(database)
+ .withMode(MultiTablesSinkMode.COMBINED)
+ .withTableOptions(tableConfig)
+ .build();
}
private void validateCaseInsensitive() {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 66eaa6614..8fc1ad16e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -283,23 +282,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
String database = this.database;
MultiTablesSinkMode mode = this.mode;
- FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
- new FlinkCdcSyncDatabaseSinkBuilder<String>()
- .withInput(
- env.fromSource(
- source,
WatermarkStrategy.noWatermarks(), "MySQL Source"))
- .withParserFactory(parserFactory)
- .withDatabase(database)
- .withCatalogLoader(catalogLoader())
- .withTables(fileStoreTables)
- .withMode(mode);
-
- String sinkParallelism =
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
- if (sinkParallelism != null) {
- sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
- }
-
- sinkBuilder.build();
+ new FlinkCdcSyncDatabaseSinkBuilder<String>()
+ .withInput(env.fromSource(source,
WatermarkStrategy.noWatermarks(), "MySQL Source"))
+ .withParserFactory(parserFactory)
+ .withDatabase(database)
+ .withCatalogLoader(catalogLoader())
+ .withTables(fileStoreTables)
+ .withMode(mode)
+ .withTableOptions(tableConfig)
+ .build();
}
private void validateCaseInsensitive() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 482696c59..cefd4517a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -267,6 +267,20 @@ public class FlinkConnectorOptions {
.withDescription(
"If true, a tag will be automatically created for
the snapshot created by flink savepoint.");
+ public static final ConfigOption<Double> SINK_COMMITTER_CPU =
+ ConfigOptions.key("sink.committer-cpu")
+ .doubleType()
+ .defaultValue(1.0)
+ .withDescription(
+ "Sink committer cpu to control cpu cores of global
committer.");
+
+ public static final ConfigOption<MemorySize> SINK_COMMITTER_MEMORY =
+ ConfigOptions.key("sink.committer-memory")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ "Sink committer memory to control heap memory of
global committer.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 4a010a494..4701e4391 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -26,6 +26,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
@@ -39,6 +40,8 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.UUID;
@@ -46,6 +49,8 @@ import static
org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -199,14 +204,32 @@ public abstract class FlinkSink<T> implements
Serializable {
}
SingleOutputStreamOperator<?> committed =
written.transform(
- GLOBAL_COMMITTER_NAME + " : " + table.name(),
- new CommittableTypeInfo(),
- committerOperator)
- .setParallelism(1)
- .setMaxParallelism(1);
+ GLOBAL_COMMITTER_NAME + " : " + table.name(),
+ new CommittableTypeInfo(),
+ committerOperator);
+ Options options = Options.fromMap(table.options());
+ configureGlobalCommitter(
+ committed, options.get(SINK_COMMITTER_CPU),
options.get(SINK_COMMITTER_MEMORY));
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
}
+ public static void configureGlobalCommitter(
+ SingleOutputStreamOperator<?> committed,
+ double cpuCores,
+ @Nullable MemorySize heapMemory) {
+ committed.setParallelism(1).setMaxParallelism(1);
+ if (heapMemory != null) {
+ SlotSharingGroup slotSharingGroup =
+ SlotSharingGroup.newBuilder(committed.getName())
+ .setCpuCores(cpuCores)
+ .setTaskHeapMemory(
+ new
org.apache.flink.configuration.MemorySize(
+ heapMemory.getBytes()))
+ .build();
+ committed.slotSharingGroup(slotSharingGroup);
+ }
+ }
+
public static void assertStreamingConfiguration(StreamExecutionEnvironment
env) {
checkArgument(
!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index c5cd18d35..d1ee0242f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -33,6 +33,7 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.manifest.WrappedManifestCommittable;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -42,10 +43,13 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.UUID;
import static
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
+import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
/**
* A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema
change if necessary.
@@ -58,9 +62,16 @@ public class FlinkCdcMultiTableSink implements Serializable {
private final boolean isOverwrite = false;
private final Catalog.Loader catalogLoader;
+ private final double commitCpuCores;
+ @Nullable private final MemorySize commitHeapMemory;
- public FlinkCdcMultiTableSink(Catalog.Loader catalogLoader) {
+ public FlinkCdcMultiTableSink(
+ Catalog.Loader catalogLoader,
+ double commitCpuCores,
+ @Nullable MemorySize commitHeapMemory) {
this.catalogLoader = catalogLoader;
+ this.commitCpuCores = commitCpuCores;
+ this.commitHeapMemory = commitHeapMemory;
}
private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
@@ -103,15 +114,14 @@ public class FlinkCdcMultiTableSink implements
Serializable {
SingleOutputStreamOperator<?> committed =
written.transform(
- GLOBAL_COMMITTER_NAME,
- typeInfo,
- new CommitterOperator<>(
- true,
- commitUser,
- createCommitterFactory(),
- createCommittableStateManager()))
- .setParallelism(1)
- .setMaxParallelism(1);
+ GLOBAL_COMMITTER_NAME,
+ typeInfo,
+ new CommitterOperator<>(
+ true,
+ commitUser,
+ createCommitterFactory(),
+ createCommittableStateManager()));
+ configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 552ec40ef..1d86b7ecb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -20,9 +20,12 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -36,6 +39,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -60,6 +64,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
private List<FileStoreTable> tables = new ArrayList<>();
@Nullable private Integer parallelism;
+ private double committerCpu;
+ @Nullable private MemorySize committerMemory;
+
// Paimon catalog used to check and create tables. There will be two
// places where this catalog is used. 1) in processing function,
// it will check newly added tables and create the corresponding
@@ -86,8 +93,14 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
return this;
}
- public FlinkCdcSyncDatabaseSinkBuilder<T> withParallelism(@Nullable
Integer parallelism) {
- this.parallelism = parallelism;
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Map<String,
String> options) {
+ return withTableOptions(Options.fromMap(options));
+ }
+
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options
options) {
+ this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
+ this.committerCpu =
options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
+ this.committerMemory =
options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
return this;
}
@@ -148,7 +161,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
partitioned.setParallelism(parallelism);
}
- FlinkCdcMultiTableSink sink = new
FlinkCdcMultiTableSink(catalogLoader);
+ FlinkCdcMultiTableSink sink =
+ new FlinkCdcMultiTableSink(catalogLoader, committerCpu,
committerMemory);
sink.sinkFrom(new DataStream<>(input.getExecutionEnvironment(),
partitioned));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index c8b0a754b..cfeaf0110 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -57,6 +57,8 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
+
/** IT cases for {@link FlinkCdcSyncDatabaseSinkBuilder}. */
public class FlinkCdcSyncDatabaseSinkITCase extends AbstractTestBase {
@@ -160,7 +162,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
.withTables(fileStoreTables)
// because we have at most 3 tables and 8 slots in
AbstractTestBase
// each table can only get 2 slots
- .withParallelism(2)
+
.withTableOptions(Collections.singletonMap(SINK_PARALLELISM.key(), "2"))
.withDatabase(DATABASE_NAME)
.withCatalogLoader(catalogLoader)
.build();