This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 130c0a0b [improve]Improve the hardcode of tableConfig (#449)
130c0a0b is described below

commit 130c0a0b0f3d4dfffe7df5fd9a93a1fb740e22c4
Author: wudongliang <[email protected]>
AuthorDate: Tue Jul 30 10:09:08 2024 +0800

    [improve]Improve the hardcode of tableConfig (#449)
---
 .../apache/doris/flink/catalog/DorisCatalog.java   |  3 +-
 .../flink/catalog/doris/DorisSchemaFactory.java    | 38 +++-------
 .../doris/flink/catalog/doris/DorisSystem.java     |  9 +--
 .../flink/sink/schema/SQLParserSchemaManager.java  |  5 +-
 .../serializer/JsonDebeziumSchemaSerializer.java   | 21 ++++--
 .../jsondebezium/JsonDebeziumChangeContext.java    | 18 +++--
 .../jsondebezium/JsonDebeziumSchemaChange.java     |  2 +
 .../JsonDebeziumSchemaChangeImplV2.java            |  5 +-
 .../jsondebezium/SQLParserSchemaChange.java        |  3 +-
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  5 +-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 19 ++---
 .../doris/flink/tools/cdc/DorisTableConfig.java    | 84 ++++++++++++++++++++++
 .../tools/cdc/mongodb/MongoDBDatabaseSync.java     |  2 +-
 .../MongoDBJsonDebeziumSchemaSerializer.java       | 22 ++++--
 .../catalog/doris/DorisSchemaFactoryTest.java      | 29 +++-----
 .../doris/flink/catalog/doris/DorisSystemTest.java |  4 +-
 .../sink/schema/SQLParserSchemaManagerTest.java    | 19 ++---
 .../TestJsonDebeziumSchemaChangeImplV2.java        |  3 +-
 .../jsondebezium/TestSQLParserSchemaChange.java    |  3 +-
 .../flink/tools/cdc/DorisTableConfigTest.java      | 46 ++++++++++++
 20 files changed, 232 insertions(+), 108 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 79d6b2d7..ca7dc114 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -56,6 +56,7 @@ import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisConnectionOptions;
 import org.apache.doris.flink.table.DorisDynamicTableFactory;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -359,7 +360,7 @@ public class DorisCatalog extends AbstractCatalog {
                         tablePath.getObjectName(),
                         getCreateDorisColumns(table.getSchema()),
                         primaryKeys,
-                        getCreateTableProps(options),
+                        new DorisTableConfig(getCreateTableProps(options)),
                         table.getComment());
 
         dorisSystem.createTable(schema);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
index b621cfdc..3f824a1f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
@@ -21,13 +21,14 @@ import org.apache.flink.annotation.VisibleForTesting;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 /**
@@ -37,14 +38,13 @@ import java.util.regex.Pattern;
  * factory
  */
 public class DorisSchemaFactory {
-    private static Map<String, Integer> tableBucketMap;
 
     public static TableSchema createTableSchema(
             String database,
             String table,
             Map<String, FieldSchema> columnFields,
             List<String> pkKeys,
-            Map<String, String> tableProperties,
+            DorisTableConfig dorisTableConfig,
             String tableComment) {
         TableSchema tableSchema = new TableSchema();
         tableSchema.setDatabase(database);
@@ -55,14 +55,10 @@ public class DorisSchemaFactory {
         tableSchema.setKeys(buildKeys(pkKeys, columnFields));
         tableSchema.setTableComment(tableComment);
         tableSchema.setDistributeKeys(buildDistributeKeys(pkKeys, 
columnFields));
-        tableSchema.setProperties(tableProperties);
-        if (tableProperties.containsKey("table-buckets")) {
-            if (MapUtils.isEmpty(tableBucketMap)) {
-                String tableBucketsConfig = 
tableProperties.get("table-buckets");
-                tableBucketMap = buildTableBucketMap(tableBucketsConfig);
-            }
-            Integer buckets = parseTableSchemaBuckets(tableBucketMap, table);
-            tableSchema.setTableBuckets(buckets);
+        if (Objects.nonNull(dorisTableConfig)) {
+            tableSchema.setProperties(dorisTableConfig.getTableProperties());
+            tableSchema.setTableBuckets(
+                    
parseTableSchemaBuckets(dorisTableConfig.getTableBuckets(), table));
         }
         return tableSchema;
     }
@@ -92,7 +88,7 @@ public class DorisSchemaFactory {
     @VisibleForTesting
     public static Integer parseTableSchemaBuckets(
             Map<String, Integer> tableBucketsMap, String tableName) {
-        if (tableBucketsMap != null) {
+        if (MapUtils.isNotEmpty(tableBucketsMap)) {
             // Firstly, if the table name is in the table-buckets map, set the 
buckets of the table.
             if (tableBucketsMap.containsKey(tableName)) {
                 return tableBucketsMap.get(tableName);
@@ -107,22 +103,4 @@ public class DorisSchemaFactory {
         }
         return null;
     }
-
-    /**
-     * Build table bucket Map.
-     *
-     * @param tableBuckets the string of tableBuckets, 
eg:student:10,student_info:20,student.*:30
-     * @return The table name and buckets map. The key is table name, the 
value is buckets.
-     */
-    @VisibleForTesting
-    public static Map<String, Integer> buildTableBucketMap(String 
tableBuckets) {
-        Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
-        String[] tableBucketsArray = tableBuckets.split(",");
-        for (String tableBucket : tableBucketsArray) {
-            String[] tableBucketArray = tableBucket.split(":");
-            tableBucketsMap.put(
-                    tableBucketArray[0].trim(), 
Integer.parseInt(tableBucketArray[1].trim()));
-        }
-        return tableBucketsMap;
-    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 0d33eb9f..be6572d3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -50,7 +50,6 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public class DorisSystem implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSystem.class);
-    private static final String TABLE_BUCKETS = "table-buckets";
     private final JdbcConnectionProvider jdbcConnectionProvider;
     private static final List<String> builtinDatabases =
             Collections.singletonList("information_schema");
@@ -200,13 +199,7 @@ public class DorisSystem implements Serializable {
         }
         // append properties
         int index = 0;
-        int skipProNum = 0;
         for (Map.Entry<String, String> entry : properties.entrySet()) {
-            // skip table-buckets
-            if (entry.getKey().equals(TABLE_BUCKETS)) {
-                skipProNum++;
-                continue;
-            }
             if (index == 0) {
                 sb.append(" PROPERTIES (");
             }
@@ -218,7 +211,7 @@ public class DorisSystem implements Serializable {
                     .append(quoteProperties(entry.getValue()));
             index++;
 
-            if (index == (schema.getProperties().size() - skipProNum)) {
+            if (index == (schema.getProperties().size())) {
                 sb.append(")");
             }
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
index c44f7433..9289c886 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -35,6 +35,7 @@ import 
org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +111,7 @@ public class SQLParserSchemaManager implements Serializable 
{
             SourceConnector sourceConnector,
             String ddl,
             String dorisTable,
-            Map<String, String> tableProperties) {
+            DorisTableConfig dorisTableConfig) {
         try {
             Statement statement = CCJSqlParserUtil.parse(ddl);
             if (statement instanceof CreateTable) {
@@ -144,7 +145,7 @@ public class SQLParserSchemaManager implements Serializable 
{
                         dbTable[1],
                         columnFields,
                         pkKeys,
-                        tableProperties,
+                        dorisTableConfig,
                         
extractTableComment(createTable.getTableOptionsStrings()));
             } else {
                 LOG.warn(
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index c1ed1de1..83dec58b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -37,6 +37,7 @@ import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSc
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.SQLParserSchemaChange;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +73,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     // <cdc db.schema.table, doris db.table>
     private Map<String, String> tableMapping;
     // create table properties
-    private Map<String, String> tableProperties;
+    private DorisTableConfig dorisTableConfig;
     private String targetDatabase;
     private String targetTablePrefix;
     private String targetTableSuffix;
@@ -120,18 +121,18 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             boolean newSchemaChange,
             DorisExecutionOptions executionOptions,
             Map<String, String> tableMapping,
-            Map<String, String> tableProperties,
+            DorisTableConfig dorisTableConfig,
             String targetDatabase,
             String targetTablePrefix,
             String targetTableSuffix,
             SchemaChangeMode schemaChangeMode) {
         this(dorisOptions, pattern, sourceTableName, newSchemaChange, 
executionOptions);
         this.tableMapping = tableMapping;
-        this.tableProperties = tableProperties;
         this.targetDatabase = targetDatabase;
         this.targetTablePrefix = targetTablePrefix;
         this.targetTableSuffix = targetTableSuffix;
         this.schemaChangeMode = schemaChangeMode;
+        this.dorisTableConfig = dorisTableConfig;
         init();
     }
 
@@ -142,7 +143,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                         tableMapping,
                         sourceTableName,
                         targetDatabase,
-                        tableProperties,
+                        dorisTableConfig,
                         objectMapper,
                         pattern,
                         lineDelimiter,
@@ -225,7 +226,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         private SchemaChangeMode schemaChangeMode;
         private DorisExecutionOptions executionOptions;
         private Map<String, String> tableMapping;
-        private Map<String, String> tableProperties;
+        private DorisTableConfig dorisTableConfig;
         private String targetDatabase;
         private String targetTablePrefix = "";
         private String targetTableSuffix = "";
@@ -268,8 +269,14 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return this;
         }
 
+        @Deprecated
         public Builder setTableProperties(Map<String, String> tableProperties) 
{
-            this.tableProperties = tableProperties;
+            this.dorisTableConfig = new DorisTableConfig(tableProperties);
+            return this;
+        }
+
+        public Builder setDorisTableConf(DorisTableConfig dorisTableConfig) {
+            this.dorisTableConfig = dorisTableConfig;
             return this;
         }
 
@@ -300,7 +307,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                     newSchemaChange,
                     executionOptions,
                     tableMapping,
-                    tableProperties,
+                    dorisTableConfig,
                     targetDatabase,
                     targetTablePrefix,
                     targetTableSuffix,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index 2a3eebe0..d1326c72 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -19,9 +19,12 @@ package 
org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 /** Record the context of schema change and data change during serialization. 
*/
@@ -33,7 +36,7 @@ public class JsonDebeziumChangeContext implements 
Serializable {
     private final String sourceTableName;
     private final String targetDatabase;
     // create table properties
-    private final Map<String, String> tableProperties;
+    private final DorisTableConfig dorisTableConfig;
     private final ObjectMapper objectMapper;
     private final Pattern pattern;
     private final String lineDelimiter;
@@ -46,7 +49,7 @@ public class JsonDebeziumChangeContext implements 
Serializable {
             Map<String, String> tableMapping,
             String sourceTableName,
             String targetDatabase,
-            Map<String, String> tableProperties,
+            DorisTableConfig dorisTableConfig,
             ObjectMapper objectMapper,
             Pattern pattern,
             String lineDelimiter,
@@ -57,7 +60,7 @@ public class JsonDebeziumChangeContext implements 
Serializable {
         this.tableMapping = tableMapping;
         this.sourceTableName = sourceTableName;
         this.targetDatabase = targetDatabase;
-        this.tableProperties = tableProperties;
+        this.dorisTableConfig = dorisTableConfig;
         this.objectMapper = objectMapper;
         this.pattern = pattern;
         this.lineDelimiter = lineDelimiter;
@@ -82,8 +85,11 @@ public class JsonDebeziumChangeContext implements 
Serializable {
         return targetDatabase;
     }
 
+    @Deprecated
     public Map<String, String> getTableProperties() {
-        return tableProperties;
+        return Objects.nonNull(dorisTableConfig)
+                ? dorisTableConfig.getTableProperties()
+                : new HashMap<>();
     }
 
     public ObjectMapper getObjectMapper() {
@@ -109,4 +115,8 @@ public class JsonDebeziumChangeContext implements 
Serializable {
     public String getTargetTableSuffix() {
         return targetTableSuffix;
     }
+
+    public DorisTableConfig getDorisTableConf() {
+        return dorisTableConfig;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index 15817da6..91be21fa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -30,6 +30,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.sink.schema.SchemaChangeManager;
 import org.apache.doris.flink.sink.writer.EventType;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.slf4j.Logger;
@@ -69,6 +70,7 @@ public abstract class JsonDebeziumSchemaChange extends 
CdcSchemaChange {
     protected String targetDatabase;
     protected String targetTablePrefix;
     protected String targetTableSuffix;
+    protected DorisTableConfig dorisTableConfig;
 
     public abstract boolean schemaChange(JsonNode recordRoot);
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 3f985801..94605105 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -68,7 +68,6 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
     // schemaChange saves table names, field, and field column information
     private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new 
LinkedHashMap<>();
     // create table properties
-    private final Map<String, String> tableProperties;
     private final Set<String> filledTables = new HashSet<>();
 
     public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext 
changeContext) {
@@ -78,7 +77,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         this.dorisOptions = changeContext.getDorisOptions();
         this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
         this.targetDatabase = changeContext.getTargetDatabase();
-        this.tableProperties = changeContext.getTableProperties();
+        this.dorisTableConfig = changeContext.getDorisTableConf();
         this.tableMapping = changeContext.getTableMapping();
         this.objectMapper = changeContext.getObjectMapper();
         this.targetTablePrefix =
@@ -233,7 +232,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         Preconditions.checkArgument(dbTable.length == 2);
 
         return DorisSchemaFactory.createTableSchema(
-                dbTable[0], dbTable[1], fields, pkList, tableProperties, 
tblComment);
+                dbTable[0], dbTable[1], fields, pkList, dorisTableConfig, 
tblComment);
     }
 
     private boolean checkSchemaChange(String database, String table, DDLSchema 
ddlSchema)
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index 71d4edfe..5bf245ee 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -44,6 +44,7 @@ public class SQLParserSchemaChange extends 
JsonDebeziumSchemaChange {
         this.tableMapping = changeContext.getTableMapping();
         this.objectMapper = changeContext.getObjectMapper();
         this.targetDatabase = changeContext.getTargetDatabase();
+        this.dorisTableConfig = changeContext.getDorisTableConf();
         this.targetTablePrefix =
                 changeContext.getTargetTablePrefix() == null
                         ? ""
@@ -106,7 +107,7 @@ public class SQLParserSchemaChange extends 
JsonDebeziumSchemaChange {
         String ddl = extractJsonNode(historyRecord, "ddl");
         extractSourceConnector(record);
         return sqlParserSchemaManager.parseCreateTableStatement(
-                sourceConnector, ddl, dorisTable, 
changeContext.getTableProperties());
+                sourceConnector, ddl, dorisTable, dorisTableConfig);
     }
 
     @VisibleForTesting
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index c91a0734..ec3b4523 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -142,7 +142,8 @@ public class CdcTools {
 
         Preconditions.checkArgument(params.has(DatabaseSyncConfig.SINK_CONF));
         Map<String, String> sinkMap = getConfigMap(params, 
DatabaseSyncConfig.SINK_CONF);
-        Map<String, String> tableMap = getConfigMap(params, 
DatabaseSyncConfig.TABLE_CONF);
+        DorisTableConfig tableConfig =
+                new DorisTableConfig(getConfigMap(params, 
DatabaseSyncConfig.TABLE_CONF));
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -158,7 +159,7 @@ public class CdcTools {
                 .setMultiToOneTarget(multiToOneTarget)
                 .setIgnoreDefaultValue(ignoreDefaultValue)
                 .setSinkConfig(sinkConfig)
-                .setTableConfig(tableMap)
+                .setTableConfig(tableConfig)
                 .setCreateTableOnly(createTableOnly)
                 .setSingleSink(singleSink)
                 .setIgnoreIncompatible(ignoreIncompatible)
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 1e66dd4c..e565d0a7 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -61,7 +61,6 @@ import java.util.regex.Pattern;
 
 public abstract class DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(DatabaseSync.class);
-    private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
     private static final String TABLE_NAME_OPTIONS = "table-name";
 
     protected Configuration config;
@@ -72,7 +71,7 @@ public abstract class DatabaseSync {
     protected Pattern includingPattern;
     protected Pattern excludingPattern;
     protected Map<Pattern, String> multiToOneRulesPattern;
-    protected Map<String, String> tableConfig = new HashMap<>();
+    protected DorisTableConfig dorisTableConfig;
     protected Configuration sinkConfig;
     protected boolean ignoreDefaultValue;
     protected boolean ignoreIncompatible;
@@ -110,10 +109,6 @@ public abstract class DatabaseSync {
         this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
         this.multiToOneRulesPattern = multiToOneRulesParser(multiToOneOrigin, 
multiToOneTarget);
         this.converter = new TableNameConverter(tablePrefix, tableSuffix, 
multiToOneRulesPattern);
-        // default enable light schema change
-        if (!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
-            this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
-        }
     }
 
     public void build() throws Exception {
@@ -336,7 +331,7 @@ public abstract class DatabaseSync {
                 .setSchemaChangeMode(schemaChangeMode)
                 .setExecutionOptions(executionOptions)
                 .setTableMapping(tableMapping)
-                .setTableProperties(tableConfig)
+                .setDorisTableConf(dorisTableConfig)
                 .setTargetDatabase(database)
                 .setTargetTablePrefix(tablePrefix)
                 .setTargetTableSuffix(tableSuffix)
@@ -468,7 +463,7 @@ public abstract class DatabaseSync {
                             dorisTable,
                             schema.getFields(),
                             schema.getPrimaryKeys(),
-                            tableConfig,
+                            dorisTableConfig,
                             schema.getTableComment());
             try {
                 dorisSystem.createTable(dorisSchema);
@@ -523,13 +518,19 @@ public abstract class DatabaseSync {
         return this;
     }
 
+    @Deprecated
     public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
         if (!CollectionUtil.isNullOrEmpty(tableConfig)) {
-            this.tableConfig = tableConfig;
+            this.dorisTableConfig = new DorisTableConfig(tableConfig);
         }
         return this;
     }
 
+    public DatabaseSync setTableConfig(DorisTableConfig tableConfig) {
+        this.dorisTableConfig = tableConfig;
+        return this;
+    }
+
     public DatabaseSync setSinkConfig(Configuration sinkConfig) {
         this.sinkConfig = sinkConfig;
         return this;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
new file mode 100644
index 00000000..912ed698
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class DorisTableConfig implements Serializable {
+    private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+    // PROPERTIES parameter in doris table creation statement. such as: 
replication_num=1.
+    private final Map<String, String> tableProperties;
+    // The specific parameters extracted from --table-conf need to be parsed 
and integrated into the
+    // doris table creation statement. such as: 
table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
+    private Map<String, Integer> tableBuckets;
+
+    // Only for testing
+    @VisibleForTesting
+    public DorisTableConfig() {
+        tableProperties = new HashMap<>();
+        tableBuckets = new HashMap<>();
+    }
+
+    public DorisTableConfig(Map<String, String> tableConfig) {
+        if (Objects.isNull(tableConfig)) {
+            tableConfig = new HashMap<>();
+        }
+        // default enable light schema change
+        if (!tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
+            tableConfig.put(LIGHT_SCHEMA_CHANGE, Boolean.toString(true));
+        }
+        if (tableConfig.containsKey(DatabaseSyncConfig.TABLE_BUCKETS)) {
+            this.tableBuckets =
+                    
buildTableBucketMap(tableConfig.get(DatabaseSyncConfig.TABLE_BUCKETS));
+            tableConfig.remove(DatabaseSyncConfig.TABLE_BUCKETS);
+        }
+        tableProperties = tableConfig;
+    }
+
+    public Map<String, Integer> getTableBuckets() {
+        return tableBuckets;
+    }
+
+    public Map<String, String> getTableProperties() {
+        return tableProperties;
+    }
+
+    /**
+     * Build table bucket Map.
+     *
+     * @param tableBuckets the string of tableBuckets, 
eg:student:10,student_info:20,student.*:30
+     * @return The table name and buckets map. The key is table name, the 
value is buckets.
+     */
+    @VisibleForTesting
+    public Map<String, Integer> buildTableBucketMap(String tableBuckets) {
+        Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
+        String[] tableBucketsArray = tableBuckets.split(",");
+        for (String tableBucket : tableBucketsArray) {
+            String[] tableBucketArray = tableBucket.split(":");
+            tableBucketsMap.put(
+                    tableBucketArray[0].trim(), 
Integer.parseInt(tableBucketArray[1].trim()));
+        }
+        return tableBucketsMap;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index 8e406f21..eac6acc4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -208,7 +208,7 @@ public class MongoDBDatabaseSync extends DatabaseSync {
                 .setDorisOptions(dorisBuilder.build())
                 .setExecutionOptions(executionOptions)
                 .setTableMapping(tableMapping)
-                .setTableProperties(tableConfig)
+                .setTableConf(dorisTableConfig)
                 .setTargetDatabase(database)
                 .build();
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
index 5c9c1b9e..b7faec2f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -28,6 +28,7 @@ import 
org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +53,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
     // <cdc db.schema.table, doris db.table>
     private Map<String, String> tableMapping;
     // create table properties
-    private Map<String, String> tableProperties;
+    private DorisTableConfig dorisTableConfig;
     private String targetDatabase;
 
     private CdcDataChange dataChange;
@@ -67,7 +68,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
             String sourceTableName,
             DorisExecutionOptions executionOptions,
             Map<String, String> tableMapping,
-            Map<String, String> tableProperties,
+            DorisTableConfig dorisTableConfig,
             String targetDatabase,
             String targetTablePrefix,
             String targetTableSuffix) {
@@ -79,7 +80,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
         JsonNodeFactory jsonNodeFactory = 
JsonNodeFactory.withExactBigDecimals(true);
         this.objectMapper.setNodeFactory(jsonNodeFactory);
         this.tableMapping = tableMapping;
-        this.tableProperties = tableProperties;
+        this.dorisTableConfig = dorisTableConfig;
         this.targetDatabase = targetDatabase;
         this.targetTablePrefix = targetTablePrefix;
         this.targetTableSuffix = targetTableSuffix;
@@ -100,7 +101,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
                         tableMapping,
                         sourceTableName,
                         targetDatabase,
-                        tableProperties,
+                        dorisTableConfig,
                         objectMapper,
                         pattern,
                         lineDelimiter,
@@ -138,7 +139,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
         private String sourceTableName;
         private DorisExecutionOptions executionOptions;
         private Map<String, String> tableMapping;
-        private Map<String, String> tableProperties;
+        private DorisTableConfig dorisTableConfig;
         private String targetDatabase;
         private String targetTablePrefix = "";
         private String targetTableSuffix = "";
@@ -172,9 +173,16 @@ public class MongoDBJsonDebeziumSchemaSerializer 
implements DorisRecordSerialize
             return this;
         }
 
+        @Deprecated
         public MongoDBJsonDebeziumSchemaSerializer.Builder setTableProperties(
                 Map<String, String> tableProperties) {
-            this.tableProperties = tableProperties;
+            this.dorisTableConfig = new DorisTableConfig(tableProperties);
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder setTableConf(
+                DorisTableConfig dorisTableConfig) {
+            this.dorisTableConfig = dorisTableConfig;
             return this;
         }
 
@@ -191,7 +199,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
                     sourceTableName,
                     executionOptions,
                     tableMapping,
-                    tableProperties,
+                    dorisTableConfig,
                     targetDatabase,
                     targetTablePrefix,
                     targetTableSuffix);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java
index 29e8d461..1bc1f115 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.catalog.doris;
 
 import org.apache.flink.util.Preconditions;
 
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,8 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-
 public class DorisSchemaFactoryTest {
 
     @Test
@@ -65,10 +64,10 @@ public class DorisSchemaFactoryTest {
                         dbTable[1],
                         columnFields,
                         pkKeys,
-                        tableProperties,
+                        new DorisTableConfig(tableProperties),
                         tableComment);
         Assert.assertEquals(
-                "TableSchema{database='doris', table='create_tab', 
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', 
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, 
id=FieldSchema{name='id', typeString='INT', defaultValue='100', 
comment='int_test'}, age=FieldSchema{name='age', typeString='INT', 
defaultValue='null', comment='null'}, email=FieldSchema{name='email', 
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}}, 
keys=emai [...]
+                "TableSchema{database='doris', table='create_tab', 
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', 
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, 
id=FieldSchema{name='id', typeString='INT', defaultValue='100', 
comment='int_test'}, age=FieldSchema{name='age', typeString='INT', 
defaultValue='null', comment='null'}, email=FieldSchema{name='email', 
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}}, 
keys=emai [...]
                 tableSchema.toString());
     }
 
@@ -86,6 +85,7 @@ public class DorisSchemaFactoryTest {
         List<String> pkKeys = Collections.singletonList("email");
         Map<String, String> tableProperties = new HashMap<>();
         tableProperties.put("table-buckets", "create_tab:40, create_taba:10, 
tabs:12");
+        tableProperties.put("replication_num", "2");
         String tableComment = "auto_tab_comment";
         TableSchema tableSchema =
                 DorisSchemaFactory.createTableSchema(
@@ -93,11 +93,10 @@ public class DorisSchemaFactoryTest {
                         dbTable[1],
                         columnFields,
                         pkKeys,
-                        tableProperties,
+                        new DorisTableConfig(tableProperties),
                         tableComment);
-
         Assert.assertEquals(
-                "TableSchema{database='doris', table='create_tab', 
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', 
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, 
id=FieldSchema{name='id', typeString='INT', defaultValue='100', 
comment='int_test'}, age=FieldSchema{name='age', typeString='INT', 
defaultValue='null', comment='null'}, email=FieldSchema{name='email', 
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}}, 
keys=emai [...]
+                "TableSchema{database='doris', table='create_tab', 
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', 
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, 
id=FieldSchema{name='id', typeString='INT', defaultValue='100', 
comment='int_test'}, age=FieldSchema{name='age', typeString='INT', 
defaultValue='null', comment='null'}, email=FieldSchema{name='email', 
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}}, 
keys=emai [...]
                 tableSchema.toString());
     }
 
@@ -113,6 +112,7 @@ public class DorisSchemaFactoryTest {
         columnFields.put("age", new FieldSchema("age", "INT", null, null));
         columnFields.put("email", new FieldSchema("email", "VARCHAR(100)", 
"[email protected]", "e"));
         Map<String, String> tableProperties = new HashMap<>();
+        tableProperties.put("replication_num", "1");
         String tableComment = "auto_tab_comment";
         TableSchema tableSchema =
                 DorisSchemaFactory.createTableSchema(
@@ -120,21 +120,10 @@ public class DorisSchemaFactoryTest {
                         dbTable[1],
                         columnFields,
                         new ArrayList<>(),
-                        tableProperties,
+                        new DorisTableConfig(tableProperties),
                         tableComment);
         Assert.assertEquals(
-                "TableSchema{database='doris', table='dup_tab', 
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', 
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, 
id=FieldSchema{name='id', typeString='INT', defaultValue='100', 
comment='int_test'}, age=FieldSchema{name='age', typeString='INT', 
defaultValue='null', comment='null'}, email=FieldSchema{name='email', 
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}}, 
keys=name, m [...]
+                "TableSchema{database='doris', table='dup_tab', 
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', 
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, 
id=FieldSchema{name='id', typeString='INT', defaultValue='100', 
comment='int_test'}, age=FieldSchema{name='age', typeString='INT', 
defaultValue='null', comment='null'}, email=FieldSchema{name='email', 
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}}, 
keys=name, m [...]
                 tableSchema.toString());
     }
-
-    @Test
-    public void buildTableBucketMapTest() {
-        String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50";
-        Map<String, Integer> tableBucketsMap = 
DorisSchemaFactory.buildTableBucketMap(tableBuckets);
-        assertEquals(10, tableBucketsMap.get("tbl1").intValue());
-        assertEquals(20, tableBucketsMap.get("tbl2").intValue());
-        assertEquals(30, tableBucketsMap.get("a.*").intValue());
-        assertEquals(40, tableBucketsMap.get("b.*").intValue());
-        assertEquals(50, tableBucketsMap.get(".*").intValue());
-    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSystemTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSystemTest.java
index df0321f6..65e1240d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSystemTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSystemTest.java
@@ -80,11 +80,11 @@ public class DorisSystemTest {
         schema.setFields(map);
         schema.setTableBuckets(1);
         Map<String, String> properties = new HashMap<>();
-        properties.put("table-buckets", "1");
+        properties.put("replication_num", "3");
         schema.setProperties(properties);
         String createTableDDL = DorisSystem.buildCreateTableDDL(schema);
         String except =
-                "CREATE TABLE IF NOT EXISTS `db`.`table`(`name` VARCHAR(65533) 
DEFAULT 'zhangsan' COMMENT '' )  DISTRIBUTED BY HASH(`name`) BUCKETS 1";
+                "CREATE TABLE IF NOT EXISTS `db`.`table`(`name` VARCHAR(65533) 
DEFAULT 'zhangsan' COMMENT '' )  DISTRIBUTED BY HASH(`name`) BUCKETS 1 
PROPERTIES ('replication_num'='3')";
         Assert.assertEquals(createTableDDL, except);
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
index d101d1e8..d8cb7c3f 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.flink.sink.schema;
 
 import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.junit.Assert;
 import org.junit.Before;
@@ -222,7 +223,7 @@ public class SQLParserSchemaManagerTest {
                         + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_0900_ai_ci";
         TableSchema tableSchema =
                 schemaManager.parseCreateTableStatement(
-                        SourceConnector.MYSQL, ddl, dorisTable, new 
HashMap<>());
+                        SourceConnector.MYSQL, ddl, dorisTable, null);
 
         String expected =
                 "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', 
defaultValue='10000', comment='id_test'}, 
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', 
defaultValue='CURRENT_TIMESTAMP', comment='null'}, 
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', 
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', 
typeString='DECIMALV3(9,3)', defaultValue='1.0 [...]
@@ -236,7 +237,7 @@ public class SQLParserSchemaManagerTest {
                 "CREATE TABLE test_sink_unique (     id INT NOT NULL,     name 
VARCHAR(100) NOT NULL,     age INT,     email VARCHAR(100),     UNIQUE (email) 
)";
         TableSchema tableSchema =
                 schemaManager.parseCreateTableStatement(
-                        SourceConnector.MYSQL, ddl, dorisTable, new 
HashMap<>());
+                        SourceConnector.MYSQL, ddl, dorisTable, null);
 
         String expected =
                 "TableSchema{database='doris', table='auto_uni_tab', 
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', 
defaultValue='null', comment='null'}, name=FieldSchema{name='name', 
typeString='VARCHAR(300)', defaultValue='null', comment='null'}, 
age=FieldSchema{name='age', typeString='INT', defaultValue='null', 
comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', 
defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distribut [...]
@@ -255,7 +256,7 @@ public class SQLParserSchemaManagerTest {
                         + ")";
         TableSchema tableSchema =
                 schemaManager.parseCreateTableStatement(
-                        SourceConnector.MYSQL, ddl, dorisTable, new 
HashMap<>());
+                        SourceConnector.MYSQL, ddl, dorisTable, null);
 
         String expected =
                 "TableSchema{database='doris', table='auto_duptab', 
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', 
defaultValue='null', comment='null'}, name=FieldSchema{name='name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
age=FieldSchema{name='age', typeString='INT', defaultValue='null', 
comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', 
defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distri [...]
@@ -284,7 +285,7 @@ public class SQLParserSchemaManagerTest {
                         + ");";
         TableSchema tableSchema =
                 schemaManager.parseCreateTableStatement(
-                        SourceConnector.ORACLE, ddl, dorisTable, new 
HashMap<>());
+                        SourceConnector.ORACLE, ddl, dorisTable, null);
 
         String expected =
                 "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={employee_id=FieldSchema{name='employee_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', 
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', 
com [...]
@@ -310,7 +311,7 @@ public class SQLParserSchemaManagerTest {
                         + ");";
         TableSchema tableSchema =
                 schemaManager.parseCreateTableStatement(
-                        SourceConnector.ORACLE, ddl, dorisTable, new 
HashMap<>());
+                        SourceConnector.ORACLE, ddl, dorisTable, null);
 
         String expected =
                 "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={employee_id=FieldSchema{name='employee_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', 
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', 
com [...]
@@ -335,10 +336,12 @@ public class SQLParserSchemaManagerTest {
                         + ");";
         TableSchema tableSchema =
                 schemaManager.parseCreateTableStatement(
-                        SourceConnector.ORACLE, ddl, dorisTable, new 
HashMap<>());
-
+                        SourceConnector.ORACLE,
+                        ddl,
+                        dorisTable,
+                        new DorisTableConfig(new HashMap<>()));
         String expected =
-                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={order_id=FieldSchema{name='order_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
customer_id=FieldSchema{name='customer_id', typeString='BIGINT', 
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', 
typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, 
status=FieldSchema{name='status', typeString='VARCHAR(60)', 
defaultValue='null', comment=' [...]
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={order_id=FieldSchema{name='order_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
customer_id=FieldSchema{name='customer_id', typeString='BIGINT', 
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', 
typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, 
status=FieldSchema{name='status', typeString='VARCHAR(60)', 
defaultValue='null', comment=' [...]
         Assert.assertEquals(expected, tableSchema.toString());
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index a3c24d15..68239618 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -26,6 +26,7 @@ import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.junit.After;
 import org.junit.Assert;
@@ -76,7 +77,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
                         tableMapping,
                         sourceTableName,
                         targetDatabase,
-                        tableProperties,
+                        new DorisTableConfig(tableProperties),
                         objectMapper,
                         null,
                         lineDelimiter,
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
index aa1333da..d3194f81 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -25,7 +25,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 
 public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase {
@@ -41,7 +40,7 @@ public class TestSQLParserSchemaChange extends 
TestJsonDebeziumChangeBase {
                         tableMapping,
                         null,
                         null,
-                        new HashMap<>(),
+                        null,
                         objectMapper,
                         null,
                         lineDelimiter,
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
new file mode 100644
index 00000000..1a82e1c6
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class DorisTableConfigTest {
+
+    private DorisTableConfig dorisTableConfig;
+
+    @Before
+    public void init() {
+        dorisTableConfig = new DorisTableConfig();
+    }
+
+    @Test
+    public void buildTableBucketMapTest() {
+        String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50";
+        Map<String, Integer> tableBucketsMap = 
dorisTableConfig.buildTableBucketMap(tableBuckets);
+        assertEquals(10, tableBucketsMap.get("tbl1").intValue());
+        assertEquals(20, tableBucketsMap.get("tbl2").intValue());
+        assertEquals(30, tableBucketsMap.get("a.*").intValue());
+        assertEquals(40, tableBucketsMap.get("b.*").intValue());
+        assertEquals(50, tableBucketsMap.get(".*").intValue());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to