This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aca3fd592 [flink][cdc] Fix that setting both table affix and including 
pattern causes table cannot be synchronized in kafka cdc (#2047)
aca3fd592 is described below

commit aca3fd592db147ed88b0520da628cb24c3372522
Author: yuzelin <[email protected]>
AuthorDate: Mon Sep 25 18:24:35 2023 +0800

    [flink][cdc] Fix that setting both table affix and including pattern causes 
table cannot be synchronized in kafka cdc (#2047)
---
 paimon-flink/paimon-flink-cdc/pom.xml              |  8 ++++
 .../paimon/flink/action/cdc/kafka/KafkaSchema.java |  5 +-
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  | 12 +++--
 .../action/cdc/kafka/KafkaSyncTableAction.java     |  9 +---
 .../flink/action/cdc/kafka/formats/DataFormat.java |  9 +---
 .../action/cdc/kafka/formats/RecordParser.java     | 12 ++---
 .../cdc/kafka/formats/RecordParserFactory.java     |  7 +--
 .../cdc/kafka/formats/canal/CanalRecordParser.java |  8 +---
 .../kafka/formats/maxwell/MaxwellRecordParser.java |  8 +---
 .../cdc/kafka/formats/ogg/OggRecordParser.java     |  8 +---
 .../action/cdc/mongodb/MongoDBRecordParser.java    | 13 ++----
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     | 15 +++---
 .../action/cdc/mongodb/MongoDBSyncTableAction.java |  4 +-
 .../flink/sink/cdc/CdcDynamicBucketSink.java       |  0
 .../sink/cdc/CdcDynamicBucketWriteOperator.java    |  0
 .../cdc/CdcDynamicTableParsingProcessFunction.java |  0
 .../flink/sink/cdc/CdcHashKeyChannelComputer.java  |  0
 .../cdc/CdcMultiTableParsingProcessFunction.java   |  0
 .../paimon/flink/sink/cdc/CdcMultiplexRecord.java  |  0
 .../cdc/CdcMultiplexRecordChannelComputer.java     |  0
 .../flink/sink/cdc/CdcParsingProcessFunction.java  |  0
 .../apache/paimon/flink/sink/cdc/CdcRecord.java    |  0
 .../flink/sink/cdc/CdcRecordChannelComputer.java   |  0
 .../sink/cdc/CdcRecordKeyAndBucketExtractor.java   |  0
 .../sink/cdc/CdcRecordPartitionKeyExtractor.java   |  0
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java |  0
 .../sink/cdc/CdcRecordStoreWriteOperator.java      |  0
 .../paimon/flink/sink/cdc/CdcRecordUtils.java      |  0
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      |  0
 .../sink/cdc/CdcWithBucketChannelComputer.java     |  0
 .../apache/paimon/flink/sink/cdc/EventParser.java  |  0
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |  0
 .../apache/paimon/flink/sink/cdc/FlinkCdcSink.java |  0
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |  0
 ...MultiTableUpdatedDataFieldsProcessFunction.java |  0
 .../flink/sink/cdc/NewTableSchemaBuilder.java      |  0
 .../flink/sink/cdc/RichCdcMultiplexRecord.java     |  0
 .../cdc/RichCdcMultiplexRecordEventParser.java     | 12 +++--
 .../cdc/RichCdcMultiplexRecordSchemaBuilder.java   |  0
 .../paimon/flink/sink/cdc/RichCdcRecord.java       |  0
 .../paimon/flink/sink/cdc/RichCdcSinkBuilder.java  |  0
 .../paimon/flink/sink/cdc/RichEventParser.java     |  0
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |  0
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  |  0
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  |  5 ++
 .../KafkaMaxwellSyncDatabaseActionITCase.java      |  5 ++
 .../kafka/KafkaOggSyncDatabaseActionITCase.java    |  5 ++
 .../mongodb/MongoDBSyncDatabaseActionITCase.java   | 54 +++++++++++++++++++---
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   |  2 +
 .../cdc/CdcMultiplexRecordChannelComputerTest.java |  0
 .../sink/cdc/CdcRecordChannelComputerTest.java     |  0
 .../cdc/CdcRecordKeyAndBucketExtractorTest.java    |  0
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  |  0
 .../sink/cdc/CdcRecordStoreWriteOperatorTest.java  |  0
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   |  0
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      |  0
 .../apache/paimon/flink/sink/cdc/TestCdcEvent.java |  0
 .../paimon/flink/sink/cdc/TestCdcEventParser.java  |  0
 .../flink/sink/cdc/TestCdcSourceFunction.java      |  0
 .../apache/paimon/flink/sink/cdc/TestTable.java    |  0
 paimon-flink/paimon-flink-common/pom.xml           |  8 ----
 .../sink/MultiTablesStoreCompactOperator.java      |  3 +-
 62 files changed, 115 insertions(+), 97 deletions(-)

diff --git a/paimon-flink/paimon-flink-cdc/pom.xml 
b/paimon-flink/paimon-flink-cdc/pom.xml
index d8e1591d2..3054993b3 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -202,6 +202,14 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-test-utils</artifactId>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 58ff74d84..a01ce3de6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
@@ -132,9 +131,7 @@ public class KafkaSchema {
         int retryInterval = 1000;
 
         DataFormat format = getDataFormat(kafkaConfig);
-        RecordParser recordParser =
-                format.createParser(
-                        true, new TableNameConverter(true), typeMapping, 
Collections.emptyList());
+        RecordParser recordParser = format.createParser(true, typeMapping, 
Collections.emptyList());
 
         while (true) {
             ConsumerRecords<String, String> consumerRecords =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 3ad479a68..61aa45108 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -148,24 +148,26 @@ public class KafkaSyncDatabaseAction extends ActionBase {
         }
 
         catalog.createDatabase(database, true);
-        TableNameConverter tableNameConverter =
-                new TableNameConverter(caseSensitive, true, tablePrefix, 
tableSuffix);
 
         KafkaSource<String> source = 
KafkaActionUtils.buildKafkaSource(kafkaConfig);
 
         DataFormat format = DataFormat.getDataFormat(kafkaConfig);
         RecordParser recordParser =
-                format.createParser(
-                        caseSensitive, tableNameConverter, typeMapping, 
Collections.emptyList());
+                format.createParser(caseSensitive, typeMapping, 
Collections.emptyList());
         RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
                 new RichCdcMultiplexRecordSchemaBuilder(tableConfig, 
caseSensitive);
         Pattern includingPattern = Pattern.compile(includingTables);
         Pattern excludingPattern =
                 excludingTables == null ? null : 
Pattern.compile(excludingTables);
+        TableNameConverter tableNameConverter =
+                new TableNameConverter(caseSensitive, true, tablePrefix, 
tableSuffix);
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
                 () ->
                         new RichCdcMultiplexRecordEventParser(
-                                schemaBuilder, includingPattern, 
excludingPattern);
+                                schemaBuilder,
+                                includingPattern,
+                                excludingPattern,
+                                tableNameConverter);
 
         new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
                 .withInput(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 0b362d4fb..2804b04e0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -26,7 +26,6 @@ import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
@@ -171,13 +170,9 @@ public class KafkaSyncTableAction extends ActionBase {
         }
         DataFormat format = DataFormat.getDataFormat(kafkaConfig);
         RecordParser recordParser =
-                format.createParser(
-                        caseSensitive,
-                        new TableNameConverter(caseSensitive),
-                        typeMapping,
-                        computedColumns);
+                format.createParser(caseSensitive, typeMapping, 
computedColumns);
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
-                RichCdcMultiplexRecordEventParser::new;
+                () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
 
         CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
                 new CdcSinkBuilder<RichCdcMultiplexRecord>()
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
index fda6311f2..71c4af8f4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.kafka.formats;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import 
org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
 import 
org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser;
@@ -54,16 +53,12 @@ public enum DataFormat {
      * configurations.
      *
      * @param caseSensitive Indicates whether the parser should be 
case-sensitive.
-     * @param tableNameConverter Converter to transform table names.
      * @param computedColumns List of computed columns to be considered by the 
parser.
      * @return A new instance of {@link RecordParser}.
      */
     public RecordParser createParser(
-            boolean caseSensitive,
-            TableNameConverter tableNameConverter,
-            TypeMapping typeMapping,
-            List<ComputedColumn> computedColumns) {
-        return parser.createParser(caseSensitive, typeMapping, 
tableNameConverter, computedColumns);
+            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
+        return parser.createParser(caseSensitive, typeMapping, 
computedColumns);
     }
 
     /**
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
index a31df4811..5e3d0f886 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.kafka.formats;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
@@ -64,7 +63,6 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
     protected static final String FIELD_TABLE = "table";
     protected static final String FIELD_DATABASE = "database";
     protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    protected final TableNameConverter tableNameConverter;
     protected final boolean caseSensitive;
     protected final TypeMapping typeMapping;
     protected final List<ComputedColumn> computedColumns;
@@ -77,13 +75,9 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
     protected String tableName;
 
     public RecordParser(
-            boolean caseSensitive,
-            TypeMapping typeMapping,
-            TableNameConverter tableNameConverter,
-            List<ComputedColumn> computedColumns) {
+            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
         this.caseSensitive = caseSensitive;
         this.typeMapping = typeMapping;
-        this.tableNameConverter = tableNameConverter;
         this.computedColumns = computedColumns;
     }
 
@@ -93,7 +87,7 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
             return null;
         }
         databaseName = extractStringFromRootJson(FIELD_DATABASE);
-        tableName = 
tableNameConverter.convert(extractStringFromRootJson(FIELD_TABLE));
+        tableName = extractStringFromRootJson(FIELD_TABLE);
         this.setPrimaryField();
         this.setDataField();
         this.validateFormat();
@@ -148,7 +142,7 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
         validateFormat();
 
         databaseName = extractStringFromRootJson(FIELD_DATABASE);
-        tableName = 
tableNameConverter.convert(extractStringFromRootJson(FIELD_TABLE));
+        tableName = extractStringFromRootJson(FIELD_TABLE);
 
         extractRecords().forEach(out::collect);
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
index 413ecbc4f..a47bca23f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.kafka.formats;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import java.util.List;
@@ -40,13 +39,9 @@ public interface RecordParserFactory {
      *
      * @param caseSensitive Indicates whether the parser should be 
case-sensitive.
      * @param typeMapping Data type mapping options.
-     * @param tableNameConverter Converter to transform table names.
      * @param computedColumns List of computed columns to be considered by the 
parser.
      * @return A new instance of {@link RecordParser}.
      */
     RecordParser createParser(
-            boolean caseSensitive,
-            TypeMapping typeMapping,
-            TableNameConverter tableNameConverter,
-            List<ComputedColumn> computedColumns);
+            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns);
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
index 42ac8c948..e0f2ec748 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.kafka.formats.canal;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
@@ -73,11 +72,8 @@ public class CanalRecordParser extends RecordParser {
     }
 
     public CanalRecordParser(
-            boolean caseSensitive,
-            TypeMapping typeMapping,
-            TableNameConverter tableNameConverter,
-            List<ComputedColumn> computedColumns) {
-        super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
+            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
+        super(caseSensitive, typeMapping, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
index 0a2df9fcd..8b51edad9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -54,11 +53,8 @@ public class MaxwellRecordParser extends RecordParser {
     private static final String OP_DELETE = "delete";
 
     public MaxwellRecordParser(
-            boolean caseSensitive,
-            TypeMapping typeMapping,
-            TableNameConverter tableNameConverter,
-            List<ComputedColumn> computedColumns) {
-        super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
+            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
+        super(caseSensitive, typeMapping, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
index 24e5fe4b1..7f4fcc76a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.kafka.formats.ogg;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -57,11 +56,8 @@ public class OggRecordParser extends RecordParser {
     private static final String OP_DELETE = "D";
 
     public OggRecordParser(
-            boolean caseSensitive,
-            TypeMapping typeMapping,
-            TableNameConverter tableNameConverter,
-            List<ComputedColumn> computedColumns) {
-        super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
+            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
+        super(caseSensitive, typeMapping, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index 3f2c33f8e..31ce83df1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import 
org.apache.paimon.flink.action.cdc.mongodb.strategy.Mongo4VersionStrategy;
 import 
org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -61,24 +60,18 @@ public class MongoDBRecordParser implements 
FlatMapFunction<String, RichCdcMulti
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private final List<ComputedColumn> computedColumns;
     private final boolean caseSensitive;
-    private final TableNameConverter tableNameConverter;
     private final Configuration mongodbConfig;
     private JsonNode root;
 
-    public MongoDBRecordParser(
-            boolean caseSensitive,
-            TableNameConverter tableNameConverter,
-            Configuration mongodbConfig) {
-        this(caseSensitive, tableNameConverter, Collections.emptyList(), 
mongodbConfig);
+    public MongoDBRecordParser(boolean caseSensitive, Configuration 
mongodbConfig) {
+        this(caseSensitive, Collections.emptyList(), mongodbConfig);
     }
 
     public MongoDBRecordParser(
             boolean caseSensitive,
-            TableNameConverter tableNameConverter,
             List<ComputedColumn> computedColumns,
             Configuration mongodbConfig) {
         this.caseSensitive = caseSensitive;
-        this.tableNameConverter = tableNameConverter;
         this.computedColumns = computedColumns;
         this.mongodbConfig = mongodbConfig;
     }
@@ -87,7 +80,7 @@ public class MongoDBRecordParser implements 
FlatMapFunction<String, RichCdcMulti
     public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) 
throws Exception {
         root = OBJECT_MAPPER.readValue(value, JsonNode.class);
         String databaseName = extractString(FIELD_DATABASE);
-        String collection = 
tableNameConverter.convert(extractString(FIELD_TABLE));
+        String collection = extractString(FIELD_TABLE);
         MongoVersionStrategy versionStrategy =
                 VersionStrategyFactory.create(
                         databaseName, collection, caseSensitive, 
computedColumns, mongodbConfig);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index ea4d7a850..b407e22a5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -125,10 +125,8 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
         }
 
         catalog.createDatabase(database, true);
-        TableNameConverter tableNameConverter =
-                new TableNameConverter(caseSensitive, true, tablePrefix, 
tableSuffix);
-        List<Identifier> excludedTables = new ArrayList<>();
 
+        List<Identifier> excludedTables = new ArrayList<>();
         MongoDBSource<String> source =
                 MongoDBActionUtils.buildMongodbSource(
                         mongodbConfig,
@@ -143,16 +141,19 @@ public class MongoDBSyncDatabaseAction extends ActionBase 
{
         Pattern includingPattern = Pattern.compile(this.includingTables);
         Pattern excludingPattern =
                 excludingTables == null ? null : 
Pattern.compile(excludingTables);
+        TableNameConverter tableNameConverter =
+                new TableNameConverter(caseSensitive, true, tablePrefix, 
tableSuffix);
         parserFactory =
                 () ->
                         new RichCdcMultiplexRecordEventParser(
-                                schemaBuilder, includingPattern, 
excludingPattern);
+                                schemaBuilder,
+                                includingPattern,
+                                excludingPattern,
+                                tableNameConverter);
         new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
                 .withInput(
                         env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "MongoDB Source")
-                                .flatMap(
-                                        new MongoDBRecordParser(
-                                                caseSensitive, 
tableNameConverter, mongodbConfig)))
+                                .flatMap(new 
MongoDBRecordParser(caseSensitive, mongodbConfig)))
                 .withParserFactory(parserFactory)
                 .withCatalogLoader(catalogLoader())
                 .withDatabase(database)
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 1c6b78ae3..3b25c2b69 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -23,7 +23,6 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -150,7 +149,7 @@ public class MongoDBSyncTableAction extends ActionBase {
         }
 
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
-                RichCdcMultiplexRecordEventParser::new;
+                () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
 
         CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
                 new CdcSinkBuilder<RichCdcMultiplexRecord>()
@@ -162,7 +161,6 @@ public class MongoDBSyncTableAction extends ActionBase {
                                         .flatMap(
                                                 new MongoDBRecordParser(
                                                         caseSensitive,
-                                                        new 
TableNameConverter(caseSensitive),
                                                         computedColumns,
                                                         mongodbConfig)))
                         .withParserFactory(parserFactory)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
similarity index 90%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 5153664ad..0e19f8c88 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
 
@@ -44,6 +45,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     private final NewTableSchemaBuilder<RichCdcMultiplexRecord> schemaBuilder;
     @Nullable private final Pattern includingPattern;
     @Nullable private final Pattern excludingPattern;
+    private final TableNameConverter tableNameConverter;
     private final Map<String, RichEventParser> parsers = new HashMap<>();
     private final Set<String> includedTables = new HashSet<>();
     private final Set<String> excludedTables = new HashSet<>();
@@ -54,17 +56,19 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     private boolean shouldSynchronizeCurrentTable;
     private RichEventParser currentParser;
 
-    public RichCdcMultiplexRecordEventParser() {
-        this(record -> Optional.empty(), null, null);
+    public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
+        this(record -> Optional.empty(), null, null, new 
TableNameConverter(caseSensitive));
     }
 
     public RichCdcMultiplexRecordEventParser(
             NewTableSchemaBuilder<RichCdcMultiplexRecord> schemaBuilder,
             @Nullable Pattern includingPattern,
-            @Nullable Pattern excludingPattern) {
+            @Nullable Pattern excludingPattern,
+            TableNameConverter tableNameConverter) {
         this.schemaBuilder = schemaBuilder;
         this.includingPattern = includingPattern;
         this.excludingPattern = excludingPattern;
+        this.tableNameConverter = tableNameConverter;
     }
 
     @Override
@@ -80,7 +84,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
 
     @Override
     public String parseTableName() {
-        return currentTable;
+        return tableNameConverter.convert(currentTable);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index d7a2bbc8f..62253a6c8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -296,6 +297,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                         .withTablePrefix("test_prefix_")
                         .withTableSuffix("_test_suffix")
                         .withTableConfig(getBasicTableConfig())
+                        // test including check with affix
+                        
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -345,6 +348,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                         .withTablePrefix("test_prefix_")
                         .withTableSuffix("_test_suffix")
                         .withTableConfig(getBasicTableConfig())
+                        // test including check with affix
+                        
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
index 5a38bd542..645a0a9c9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -263,6 +264,8 @@ public class KafkaMaxwellSyncDatabaseActionITCase extends 
KafkaActionITCaseBase
                         .withTablePrefix("test_prefix_")
                         .withTableSuffix("_test_suffix")
                         .withTableConfig(getBasicTableConfig())
+                        // test including check with affix
+                        
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -317,6 +320,8 @@ public class KafkaMaxwellSyncDatabaseActionITCase extends 
KafkaActionITCaseBase
                         .withTablePrefix("test_prefix_")
                         .withTableSuffix("_test_suffix")
                         .withTableConfig(getBasicTableConfig())
+                        // test including check with affix
+                        
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
index 538b109a2..b452aa443 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -261,6 +262,8 @@ public class KafkaOggSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                         .withTablePrefix("TEST_PREFIX_")
                         .withTableSuffix("_TEST_SUFFIX")
                         .withTableConfig(getBasicTableConfig())
+                        // test including check with affix
+                        
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "T1|T2" : ".*")
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -312,6 +315,8 @@ public class KafkaOggSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                         .withTablePrefix("TEST_PREFIX_")
                         .withTableSuffix("_TEST_SUFFIX")
                         .withTableConfig(getBasicTableConfig())
+                        // test including check with affix
+                        
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "T1|T2" : ".*")
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index 0b55ce8ab..1e736534f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -32,6 +32,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -52,14 +53,15 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
                         .build();
         runActionWithDefaultEnv(action);
 
-        testSchemaEvolutionImpl();
+        testSchemaEvolutionImpl("t1", "t2", database);
     }
 
-    private void testSchemaEvolutionImpl() throws Exception {
-        waitingTables("t1", "t2");
+    private void testSchemaEvolutionImpl(String table1Name, String table2Name, 
String dbName)
+            throws Exception {
+        waitingTables(table1Name, table2Name);
 
-        FileStoreTable table1 = getFileStoreTable("t1");
-        FileStoreTable table2 = getFileStoreTable("t2");
+        FileStoreTable table1 = getFileStoreTable(table1Name);
+        FileStoreTable table2 = getFileStoreTable(table2Name);
 
         RowType rowType1 =
                 RowType.of(
@@ -95,7 +97,7 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
                         "+I[100000000000000000000103, user_3, Hangzhou, 
1235567891234]");
         waitForResult(expected, table2, rowType2, primaryKeys2);
 
-        writeRecordsToMongoDB("test-data-3", database, "database");
+        writeRecordsToMongoDB("test-data-3", dbName, "database");
 
         expected =
                 Arrays.asList(
@@ -104,7 +106,7 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
                         "+I[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8]");
         waitForResult(expected, table1, rowType1, primaryKeys1);
 
-        writeRecordsToMongoDB("test-data-4", database, "database");
+        writeRecordsToMongoDB("test-data-4", dbName, "database");
 
         expected =
                 Arrays.asList(
@@ -237,4 +239,42 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
                         "+I[610000000000000000000103, 
youtube#videoListResponse, 
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", 
{\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1},
 
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":
 [...]
         waitForResult(expected2, table2, rowType2, primaryKeys2);
     }
+
+    @Test
+    @Timeout(60)
+    public void testTableAffix() throws Exception {
+        // create table t1
+        createFileStoreTable(
+                "test_prefix_t1_test_suffix",
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "name", "description", "weight"}),
+                Collections.emptyList(),
+                Collections.singletonList("_id"),
+                Collections.emptyMap());
+
+        // try synchronization
+        String dbName = database + UUID.randomUUID();
+        writeRecordsToMongoDB("test-data-1", dbName, "database");
+        writeRecordsToMongoDB("test-data-2", dbName, "database");
+
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", dbName);
+        MongoDBSyncDatabaseAction action =
+                syncDatabaseActionBuilder(mongodbConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withTablePrefix("test_prefix_")
+                        .withTableSuffix("_test_suffix")
+                        // test including check with affix
+                        
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        testSchemaEvolutionImpl("test_prefix_t1_test_suffix", 
"test_prefix_t2_test_suffix", dbName);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 8990a73b7..265c4ea37 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -298,6 +298,8 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         .withTableConfig(getBasicTableConfig())
                         .withTablePrefix("test_prefix_")
                         .withTableSuffix("_test_suffix")
+                        // test including check with affix
+                        
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 8e6f0e840..9bea87fcc 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -162,14 +162,6 @@ under the License.
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-test-utils</artifactId>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 8ce822bc2..15948d6d5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -45,7 +45,6 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
-import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 
 /**
@@ -221,7 +220,7 @@ public class MultiTablesStoreCompactOperator
                     // table not found, waiting until table is created by
                     //     upstream operators
                 }
-                Thread.sleep(RETRY_SLEEP_TIME.defaultValue().toMillis());
+                Thread.sleep(500);
             }
         }
         return table;

Reply via email to