This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 734662e  [Feature](CDC)Support database sync use single sink (#245)
734662e is described below

commit 734662e18339965ab522957729c97cebc655cfe1
Author: wudi <[email protected]>
AuthorDate: Wed Nov 29 14:37:21 2023 +0800

    [Feature](CDC)Support database sync use single sink (#245)
---
 flink-doris-connector/pom.xml                      |   2 +-
 .../flink/sink/writer/serializer/DorisRecord.java  |  12 ++
 .../serializer/JsonDebeziumSchemaSerializer.java   | 120 +++++++++++++++---
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  17 ++-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 140 ++++++++++++++++-----
 .../cdc/{mysql => }/ParsingProcessFunction.java    |   3 +-
 .../apache/doris/flink/tools/cdc/SourceSchema.java |  24 ++++
 .../writer/TestJsonDebeziumSchemaSerializer.java   |  58 +++++++++
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |  16 ++-
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |  16 ++-
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |  16 ++-
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    |  16 ++-
 12 files changed, 387 insertions(+), 53 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 13a7ec4..54dc7ba 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -70,7 +70,7 @@ under the License.
         <revision>1.5.0-SNAPSHOT</revision>
         <flink.version>1.18.0</flink.version>
         <flink.major.version>1.18</flink.major.version>
-        <flink.sql.cdc.version>2.4.1</flink.sql.cdc.version>
+        <flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
         <libthrift.version>0.16.0</libthrift.version>
         <arrow.version>5.0.0</arrow.version>
         <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
index 02ae3bc..fc2f00c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
@@ -67,6 +67,18 @@ public class DorisRecord implements Serializable {
         return new DorisRecord(database, table, row);
     }
 
+    public static DorisRecord of(String tableIdentifier, byte[] row) {
+        if(tableIdentifier != null) {
+            String[] dbTbl = tableIdentifier.split("\\.");
+            if(dbTbl.length == 2){
+                String database = dbTbl[0];
+                String table = dbTbl[1];
+                return new DorisRecord(database, table, row);
+            }
+        }
+        return null;
+    }
+
     public static DorisRecord of(byte[] row) {
         return new DorisRecord(null, null, row);
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 4dc654e..d87da4c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -33,12 +33,14 @@ import 
org.apache.doris.flink.sink.schema.SchemaChangeHelper;
 import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
 import org.apache.doris.flink.sink.schema.SchemaChangeManager;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
 import org.apache.doris.flink.tools.cdc.oracle.OracleType;
 import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
 import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +88,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private boolean ignoreUpdateBefore = true;
     private SourceConnector sourceConnector;
     private SchemaChangeManager schemaChangeManager;
+    // <cdc db.schema.table, doris db.table>
+    private Map<String, String> tableMapping;
 
     public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
             Pattern pattern,
@@ -93,9 +97,11 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             boolean newSchemaChange) {
         this.dorisOptions = dorisOptions;
         this.addDropDDLPattern = pattern == null ? 
Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
-        String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
-        this.database = tableInfo[0];
-        this.table = tableInfo[1];
+        
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
+            String[] tableInfo = 
dorisOptions.getTableIdentifier().split("\\.");
+            this.database = tableInfo[0];
+            this.table = tableInfo[1];
+        }
         this.sourceTableName = sourceTableName;
         // Prevent loss of decimal data precision
         
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
@@ -120,10 +126,29 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         }
     }
 
+    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
+            Pattern pattern,
+            String sourceTableName,
+            boolean newSchemaChange,
+            DorisExecutionOptions executionOptions,
+            Map<String, String> tableMapping) {
+        this(dorisOptions, pattern, sourceTableName, newSchemaChange, 
executionOptions);
+        this.tableMapping = tableMapping;
+    }
+
     @Override
     public DorisRecord serialize(String record) throws IOException {
         LOG.debug("received debezium json data {} :", record);
         JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+
+        //Filter out table records that are not in tableMapping
+        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+        String dorisTableIdentifier = 
getDorisTableIdentifier(cdcTableIdentifier);
+        if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
+            LOG.warn("filter table {}, because it is not listened, record 
detail is {}", cdcTableIdentifier, record);
+            return null;
+        }
+
         String op = extractJsonNode(recordRoot, "op");
         if (Objects.isNull(op)) {
             // schema change ddl
@@ -146,7 +171,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 addDeleteSign(valueMap, false);
                 break;
             case OP_UPDATE:
-                return DorisRecord.of(extractUpdate(recordRoot));
+                return DorisRecord.of(dorisTableIdentifier, 
extractUpdate(recordRoot));
             case OP_DELETE:
                 valueMap = extractBeforeRow(recordRoot);
                 addDeleteSign(valueMap, true);
@@ -155,7 +180,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 LOG.error("parse record fail, unknown op {} in {}", op, 
record);
                 return null;
         }
-        return 
DorisRecord.of(objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
+
+        return DorisRecord.of(dorisTableIdentifier, 
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
     }
 
     /**
@@ -187,6 +213,13 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && 
!checkTable(recordRoot)) {
                 return false;
             }
+
+            // db,table
+            Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+            if(tuple == null){
+                return false;
+            }
+
             List<String> ddlSqlList = extractDDLList(recordRoot);
             if (CollectionUtils.isEmpty(ddlSqlList)) {
                 LOG.info("ddl can not do schema change:{}", recordRoot);
@@ -197,8 +230,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             for (int i = 0; i < ddlSqlList.size(); i++) {
                 DDLSchema ddlSchema = ddlSchemas.get(i);
                 String ddlSql = ddlSqlList.get(i);
-                boolean doSchemaChange = checkSchemaChange(ddlSchema);
-                status = doSchemaChange && schemaChangeManager.execute(ddlSql, 
database);
+                boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, 
ddlSchema);
+                status = doSchemaChange && schemaChangeManager.execute(ddlSql, 
tuple.f0);
                 LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
             }
         } catch (Exception ex) {
@@ -209,6 +242,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
 
     @VisibleForTesting
     public List<String> extractDDLList(JsonNode record) throws 
JsonProcessingException {
+        String dorisTable = getDorisTableIdentifier(record);
         JsonNode historyRecord = extractHistoryRecord(record);
         JsonNode tableChanges = historyRecord.get("tableChanges");
         String ddl = extractJsonNode(historyRecord, "ddl");
@@ -233,7 +267,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             String oldColumnName = renameMatcher.group(2);
             String newColumnName = renameMatcher.group(3);
             return SchemaChangeHelper.generateRenameDDLSql(
-                    dorisOptions.getTableIdentifier(), oldColumnName, 
newColumnName, originFieldSchemaMap);
+                    dorisTable, oldColumnName, newColumnName, 
originFieldSchemaMap);
         }
 
         // add/drop ddl
@@ -248,7 +282,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         if (!matcher.find()) {
             return null;
         }
-        return 
SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
+        return SchemaChangeHelper.generateDDLSql(dorisTable);
     }
 
     @VisibleForTesting
@@ -262,13 +296,20 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && 
!checkTable(recordRoot)) {
                 return false;
             }
+            // db,table
+            Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+            if(tuple == null){
+                return false;
+            }
+
             String ddl = extractDDL(recordRoot);
             if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
                 LOG.info("ddl can not do schema change:{}", recordRoot);
                 return false;
             }
-            boolean doSchemaChange = checkSchemaChange(ddl);
-            status = doSchemaChange && schemaChangeManager.execute(ddl, 
database);
+
+            boolean doSchemaChange = checkSchemaChange(ddl, tuple.f0, 
tuple.f1);
+            status = doSchemaChange && schemaChangeManager.execute(ddl, 
tuple.f0);
             LOG.info("schema change status:{}", status);
         } catch (Exception ex) {
             LOG.warn("schema change error :", ex);
@@ -286,12 +327,48 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return sourceTableName.equals(dbTbl);
     }
 
-    private boolean checkSchemaChange(String ddl) throws IOException, 
IllegalArgumentException {
+    public String getCdcTableIdentifier(JsonNode record){
+        String db = extractJsonNode(record.get("source"), "db");
+        String schema = extractJsonNode(record.get("source"), "schema");
+        String table = extractJsonNode(record.get("source"), "table");
+        return SourceSchema.getString(db, schema, table);
+    }
+
+    public String getDorisTableIdentifier(String cdcTableIdentifier){
+        
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
+            return dorisOptions.getTableIdentifier();
+        }
+        if(!CollectionUtil.isNullOrEmpty(tableMapping)
+                && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+                && tableMapping.get(cdcTableIdentifier) != null){
+            return tableMapping.get(cdcTableIdentifier);
+        }
+        return null;
+    }
+
+    protected String getDorisTableIdentifier(JsonNode record){
+        String identifier = getCdcTableIdentifier(record);
+        return getDorisTableIdentifier(identifier);
+    }
+
+    protected Tuple2<String, String> getDorisTableTuple(JsonNode record){
+        String identifier = getDorisTableIdentifier(record);
+        if(StringUtils.isNullOrWhitespaceOnly(identifier)){
+            return null;
+        }
+        String[] tableInfo = identifier.split("\\.");
+        if(tableInfo.length != 2){
+            return null;
+        }
+        return Tuple2.of(tableInfo[0], tableInfo[1]);
+    }
+
+    private boolean checkSchemaChange(String database, String table, String 
ddl) throws IOException, IllegalArgumentException {
         Map<String, Object> param = buildRequestParam(ddl);
         return schemaChangeManager.checkSchemaChange(database, table, param);
     }
 
-    private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, 
IllegalArgumentException {
+    private boolean checkSchemaChange(String database, String table, DDLSchema 
ddlSchema) throws IOException, IllegalArgumentException {
         Map<String, Object> param = 
SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(), 
ddlSchema.getColumnName());
         return schemaChangeManager.checkSchemaChange(database, table, param);
     }
@@ -328,8 +405,6 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return extractJsonNode(record.get("source"), "table");
     }
 
-
-
     private String extractJsonNode(JsonNode record, String key) {
         return record != null && record.get(key) != null &&
                 !(record.get(key) instanceof NullNode) ? 
record.get(key).asText() : null;
@@ -369,7 +444,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 String col = matcher.group(3);
                 String type = matcher.group(5);
                 type = handleType(type);
-                ddl = String.format(EXECUTE_DDL, 
dorisOptions.getTableIdentifier(), op, col, type);
+                ddl = String.format(EXECUTE_DDL, 
getDorisTableIdentifier(record), op, col, type);
                 LOG.info("parse ddl:{}", ddl);
                 return ddl;
             }
@@ -475,6 +550,11 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         this.sourceConnector = 
SourceConnector.valueOf(sourceConnector.toUpperCase());
     }
 
+    @VisibleForTesting
+    public void setTableMapping(Map<String, String> tableMapping) {
+        this.tableMapping = tableMapping;
+    }
+
     public static JsonDebeziumSchemaSerializer.Builder builder() {
         return new JsonDebeziumSchemaSerializer.Builder();
     }
@@ -488,6 +568,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         private String sourceTableName;
         private boolean newSchemaChange;
         private DorisExecutionOptions executionOptions;
+        private Map<String, String> tableMapping;
 
         public JsonDebeziumSchemaSerializer.Builder 
setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
@@ -514,9 +595,14 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return this;
         }
 
+        public Builder setTableMapping(Map<String, String> tableMapping) {
+            this.tableMapping = tableMapping;
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer build() {
             return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern, sourceTableName, newSchemaChange,
-                    executionOptions);
+                    executionOptions, tableMapping);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 8a8b3db..d05fa15 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -108,13 +108,28 @@ public class CdcTools {
         boolean createTableOnly = params.has("create-table-only");
         boolean ignoreDefaultValue = params.has("ignore-default-value");
         boolean useNewSchemaChange = params.has("use-new-schema-change");
+        boolean singleSink = params.has("single-sink");
 
         Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
         Map<String, String> tableMap = getConfigMap(params, "table-conf");
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        databaseSync.create(env, database, config, tablePrefix, tableSuffix, 
includingTables, excludingTables,multiToOneOrigin,multiToOneTarget, 
ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
+        databaseSync.setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setTablePrefix(tablePrefix)
+                .setTableSuffix(tableSuffix)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setMultiToOneOrigin(multiToOneOrigin)
+                .setMultiToOneTarget(multiToOneTarget)
+                .setIgnoreDefaultValue(ignoreDefaultValue)
+                .setSinkConfig(sinkConfig)
+                .setTableConfig(tableMap)
+                .setCreateTableOnly(createTableOnly)
+                .setNewSchemaChange(useNewSchemaChange)
+                .create();
         databaseSync.build();
         if(StringUtils.isNullOrWhitespaceOnly(jobName)){
             jobName = String.format("%s-Doris Sync Database: %s", type, 
config.getString("database-name","db"));
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index f1579b0..4dfa6d5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -25,7 +25,6 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
 import 
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
 import org.apache.doris.flink.table.DorisConfigOptions;
-import org.apache.doris.flink.tools.cdc.mysql.ParsingProcessFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -60,7 +59,7 @@ public abstract class DatabaseSync {
     protected Pattern includingPattern;
     protected Pattern excludingPattern;
     protected Map<Pattern, String> multiToOneRulesPattern;
-    protected Map<String, String> tableConfig;
+    protected Map<String, String> tableConfig = new HashMap<>();
     protected Configuration sinkConfig;
     protected boolean ignoreDefaultValue;
 
@@ -71,6 +70,10 @@ public abstract class DatabaseSync {
     protected String excludingTables;
     protected String multiToOneOrigin;
     protected String multiToOneTarget;
+    protected String tablePrefix;
+    protected String tableSuffix;
+    protected boolean singleSink;
+    private Map<String, String> tableMapping = new HashMap<>();
 
     public abstract void registerDriver() throws SQLException;
 
@@ -84,30 +87,15 @@ public abstract class DatabaseSync {
         registerDriver();
     }
 
-    public void create(StreamExecutionEnvironment env, String database, 
Configuration config,
-                       String tablePrefix, String tableSuffix, String 
includingTables,
-                       String excludingTables,String multiToOneOrigin,String 
multiToOneTarget, boolean ignoreDefaultValue, Configuration sinkConfig,
-            Map<String, String> tableConfig, boolean createTableOnly, boolean 
useNewSchemaChange) {
-        this.env = env;
-        this.config = config;
-        this.database = database;
-        this.includingTables = includingTables;
-        this.excludingTables = excludingTables;
-        this.multiToOneOrigin = multiToOneOrigin;
-        this.multiToOneTarget = multiToOneTarget;
+    public void create() {
         this.includingPattern = includingTables == null ? null : 
Pattern.compile(includingTables);
         this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
         this.multiToOneRulesPattern = 
multiToOneRulesParser(multiToOneOrigin,multiToOneTarget);
         this.converter = new TableNameConverter(tablePrefix, 
tableSuffix,multiToOneRulesPattern);
-        this.ignoreDefaultValue = ignoreDefaultValue;
-        this.sinkConfig = sinkConfig;
-        this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
         //default enable light schema change
         if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){
             this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
         }
-        this.createTableOnly = createTableOnly;
-        this.newSchemaChange = useNewSchemaChange;
     }
 
     public void build() throws Exception {
@@ -125,7 +113,10 @@ public abstract class DatabaseSync {
         List<String> dorisTables = new ArrayList<>();
         for (SourceSchema schema : schemaList) {
             syncTables.add(schema.getTableName());
-            String dorisTable=converter.convert(schema.getTableName());
+            String dorisTable = converter.convert(schema.getTableName());
+
+            //Calculate the mapping relationship between upstream and 
downstream tables
+            tableMapping.put(schema.getTableIdentifier(), 
String.format("%s.%s", database, dorisTable));
             if (!dorisSystem.tableExists(database, dorisTable)) {
                 TableSchema dorisSchema = 
schema.convertTableSchema(tableConfig);
                 //set doris target database
@@ -144,12 +135,16 @@ public abstract class DatabaseSync {
 
         config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", 
syncTables) + ")");
         DataStreamSource<String> streamSource = buildCdcSource(env);
-        SingleOutputStreamOperator<Void> parsedStream = 
streamSource.process(new ParsingProcessFunction(converter));
-        for (String table : dorisTables) {
-            OutputTag<String> recordOutputTag = 
ParsingProcessFunction.createRecordOutputTag(table);
-            DataStream<String> sideOutput = 
parsedStream.getSideOutput(recordOutputTag);
-            int sinkParallel = 
sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, 
sideOutput.getParallelism());
-            
sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
+        if(singleSink){
+            streamSource.sinkTo(buildDorisSink());
+        }else{
+            SingleOutputStreamOperator<Void> parsedStream = 
streamSource.process(new ParsingProcessFunction(converter));
+            for (String table : dorisTables) {
+                OutputTag<String> recordOutputTag = 
ParsingProcessFunction.createRecordOutputTag(table);
+                DataStream<String> sideOutput = 
parsedStream.getSideOutput(recordOutputTag);
+                int sinkParallel = 
sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, 
sideOutput.getParallelism());
+                
sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
+            }
         }
     }
 
@@ -171,6 +166,13 @@ public abstract class DatabaseSync {
         return builder.build();
     }
 
+    /**
+     * create doris sink for multi table
+     */
+    public DorisSink<String> buildDorisSink(){
+        return buildDorisSink(null);
+    }
+
     /**
      * create doris sink
      */
@@ -179,17 +181,20 @@ public abstract class DatabaseSync {
         String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
         String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
         String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
-        String labelPrefix = 
sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
 
         DorisSink.Builder<String> builder = DorisSink.builder();
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
         dorisBuilder.setFenodes(fenodes)
                 .setBenodes(benodes)
-                .setTableIdentifier(database + "." + table)
                 .setUsername(user)
                 .setPassword(passwd);
         
sinkConfig.getOptional(DorisConfigOptions.AUTO_REDIRECT).ifPresent(dorisBuilder::setAutoRedirect);
 
+        //single sink not need table identifier
+        if(!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)){
+            dorisBuilder.setTableIdentifier(database + "." + table);
+        }
+
         Properties pro = new Properties();
         //default json data format
         pro.setProperty("format", "json");
@@ -198,9 +203,9 @@ public abstract class DatabaseSync {
         Properties streamLoadProp = 
DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
         pro.putAll(streamLoadProp);
         DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder()
-                .setLabelPrefix(String.join("-", labelPrefix, database, table))
                 .setStreamLoadProp(pro);
 
+        
sinkConfig.getOptional(DorisConfigOptions.SINK_LABEL_PREFIX).ifPresent(executionBuilder::setLabelPrefix);
         
sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable);
         
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount);
         
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize);
@@ -234,6 +239,7 @@ public abstract class DatabaseSync {
                         .setDorisOptions(dorisBuilder.build())
                         .setNewSchemaChange(newSchemaChange)
                         .setExecutionOptions(executionOptions)
+                        .setTableMapping(tableMapping)
                         .build())
                 .setDorisOptions(dorisBuilder.build());
         return builder.build();
@@ -278,6 +284,82 @@ public abstract class DatabaseSync {
         return multiToOneRulesPattern;
     }
 
+
+    public DatabaseSync setEnv(StreamExecutionEnvironment env) {
+        this.env = env;
+        return this;
+    }
+
+    public DatabaseSync setConfig(Configuration config) {
+        this.config = config;
+        return this;
+    }
+
+    public DatabaseSync setDatabase(String database) {
+        this.database = database;
+        return this;
+    }
+
+    public DatabaseSync setIncludingTables(String includingTables) {
+        this.includingTables = includingTables;
+        return this;
+    }
+
+    public DatabaseSync setExcludingTables(String excludingTables) {
+        this.excludingTables = excludingTables;
+        return this;
+    }
+
+    public DatabaseSync setMultiToOneOrigin(String multiToOneOrigin) {
+        this.multiToOneOrigin = multiToOneOrigin;
+        return this;
+    }
+
+    public DatabaseSync setMultiToOneTarget(String multiToOneTarget) {
+        this.multiToOneTarget = multiToOneTarget;
+        return this;
+    }
+
+    public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
+        this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public DatabaseSync setSinkConfig(Configuration sinkConfig) {
+        this.sinkConfig = sinkConfig;
+        return this;
+    }
+
+    public DatabaseSync setIgnoreDefaultValue(boolean ignoreDefaultValue) {
+        this.ignoreDefaultValue = ignoreDefaultValue;
+        return this;
+    }
+
+    public DatabaseSync setCreateTableOnly(boolean createTableOnly) {
+        this.createTableOnly = createTableOnly;
+        return this;
+    }
+
+    public DatabaseSync setNewSchemaChange(boolean newSchemaChange) {
+        this.newSchemaChange = newSchemaChange;
+        return this;
+    }
+
+    public DatabaseSync setSingleSink(boolean singleSink) {
+        this.singleSink = singleSink;
+        return this;
+    }
+
+    public DatabaseSync setTablePrefix(String tablePrefix) {
+        this.tablePrefix = tablePrefix;
+        return this;
+    }
+
+    public DatabaseSync setTableSuffix(String tableSuffix) {
+        this.tableSuffix = tableSuffix;
+        return this;
+    }
+
     public static class TableNameConverter implements Serializable {
         private static final long serialVersionUID = 1L;
         private final String prefix;
@@ -322,4 +404,6 @@ public abstract class DatabaseSync {
             return target;
         }
     }
+
+
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
similarity index 96%
rename from 
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
rename to 
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 563c848..a1c17f6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -14,11 +14,10 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.doris.flink.tools.cdc.mysql;
+package org.apache.doris.flink.tools.cdc;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 9168cb5..51d8013 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.tools.cdc;
 import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.flink.util.StringUtils;
 
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
@@ -27,9 +28,11 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.StringJoiner;
 
 public abstract class SourceSchema {
     private final String databaseName;
+    private final String schemaName;
     private final String tableName;
     private final String tableComment;
     private final LinkedHashMap<String, FieldSchema> fields;
@@ -40,6 +43,7 @@ public abstract class SourceSchema {
             DatabaseMetaData metaData, String databaseName, String schemaName, 
String tableName, String tableComment)
             throws Exception {
         this.databaseName = databaseName;
+        this.schemaName = schemaName;
         this.tableName = tableName;
         this.tableComment = tableComment;
 
@@ -74,6 +78,26 @@ public abstract class SourceSchema {
 
     public abstract String convertToDorisType(String fieldType, Integer 
precision, Integer scale);
 
+    public String getTableIdentifier(){
+        return getString(databaseName, schemaName, tableName);
+    }
+
+    public static String getString(String databaseName, String schemaName, 
String tableName) {
+        StringJoiner identifier = new StringJoiner(".");
+        if(!StringUtils.isNullOrWhitespaceOnly(databaseName)){
+            identifier.add(databaseName);
+        }
+        if(!StringUtils.isNullOrWhitespaceOnly(schemaName)){
+            identifier.add(schemaName);
+        }
+
+        if(!StringUtils.isNullOrWhitespaceOnly(tableName)){
+            identifier.add(tableName);
+        }
+
+        return identifier.toString();
+    }
+
     public TableSchema convertTableSchema(Map<String, String> tableProps) {
         TableSchema tableSchema = new TableSchema();
         tableSchema.setModel(this.model);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index bc037a8..de549ef 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -361,4 +362,61 @@ public class TestJsonDebeziumSchemaSerializer {
         }
         return targetField;
     }
+
+    @Test
+    public void testGetCdcTableIdentifier() throws Exception {
+        String insert = 
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\
 [...]
+        JsonNode recordRoot = objectMapper.readTree(insert);
+        String identifier = serializer.getCdcTableIdentifier(recordRoot);
+        Assert.assertEquals( "test.t1", identifier);
+
+        String insertSchema = 
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\
 [...]
+        String identifierSchema = 
serializer.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+        Assert.assertEquals( "test.dbo.t1", identifierSchema);
+
+        String ddl = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"
 [...]
+        String ddlRes = 
serializer.getCdcTableIdentifier(objectMapper.readTree(ddl));
+        Assert.assertEquals( "test.t1", ddlRes);
+    }
+
+    @Test
+    public void testGetDorisTableIdentifier() throws Exception {
+        Map<String, String> map = new HashMap<>();
+        map.put("test.dbo.t1", "test.t1");
+        serializer.setTableMapping(map);
+        String identifier = serializer.getDorisTableIdentifier("test.dbo.t1");
+        Assert.assertEquals( "test.t1", identifier);
+
+        identifier = serializer.getDorisTableIdentifier("test.t1");
+        Assert.assertEquals("test.t1", identifier);
+
+        String tmp = dorisOptions.getTableIdentifier();
+        dorisOptions.setTableIdentifier(null);
+        identifier = serializer.getDorisTableIdentifier("test.t1");
+        Assert.assertNull( identifier);
+        dorisOptions.setTableIdentifier(tmp);
+    }
+
+    @Test
+    public void testSchemaChangeMultiTable() throws Exception {
+        Map<String, String> map = new HashMap<>();
+        map.put("mysql.t1", "doris.t1");
+        map.put("mysql.t2", "doris.t2");
+        serializer.setTableMapping(map);
+        String tmp = dorisOptions.getTableIdentifier();
+        dorisOptions.setTableIdentifier(null);
+        String ddl1 = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\
 [...]
+        String ddl2 = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\
 [...]
+        String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)";
+        String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)";
+
+        Assert.assertEquals(exceptDDL1, 
serializer.extractDDL(objectMapper.readTree(ddl1)));
+        Assert.assertEquals(exceptDDL2, 
serializer.extractDDL(objectMapper.readTree(ddl2)));
+
+        //Assert.assertEquals(exceptDDL1, 
serializer.extractDDLList(objectMapper.readTree(ddl1)));
+        //Assert.assertEquals(exceptDDL2, 
serializer.extractDDLList(objectMapper.readTree(ddl2)));
+
+        dorisOptions.setTableIdentifier(tmp);
+    }
+
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 875fb4c..f0493a9 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -72,7 +72,21 @@ public class CdcMysqlSyncDatabaseCase {
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new MysqlDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        databaseSync.setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setTablePrefix(tablePrefix)
+                .setTableSuffix(tableSuffix)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setMultiToOneOrigin(multiToOneOrigin)
+                .setMultiToOneTarget(multiToOneTarget)
+                .setIgnoreDefaultValue(ignoreDefaultValue)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                .setNewSchemaChange(useNewSchemaChange)
+                .create();
         databaseSync.build();
         env.execute(String.format("MySQL-Doris Database Sync: %s", database));
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 9b6277f..6140138 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -76,7 +76,21 @@ public class CdcOraclelSyncDatabaseCase {
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new OracleDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        databaseSync.setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setTablePrefix(tablePrefix)
+                .setTableSuffix(tableSuffix)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setMultiToOneOrigin(multiToOneOrigin)
+                .setMultiToOneTarget(multiToOneTarget)
+                .setIgnoreDefaultValue(ignoreDefaultValue)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                .setNewSchemaChange(useNewSchemaChange)
+                .create();
         databaseSync.build();
         env.execute(String.format("Oracle-Doris Database Sync: %s", database));
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 87fa871..4887f36 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -79,7 +79,21 @@ public class CdcPostgresSyncDatabaseCase {
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new PostgresDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        databaseSync.setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setTablePrefix(tablePrefix)
+                .setTableSuffix(tableSuffix)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setMultiToOneOrigin(multiToOneOrigin)
+                .setMultiToOneTarget(multiToOneTarget)
+                .setIgnoreDefaultValue(ignoreDefaultValue)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                .setNewSchemaChange(useNewSchemaChange)
+                .create();
         databaseSync.build();
         env.execute(String.format("Postgres-Doris Database Sync: %s", 
database));
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index d247500..7129e77 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -77,7 +77,21 @@ public class CdcSqlServerSyncDatabaseCase {
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new SqlServerDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        databaseSync.setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setTablePrefix(tablePrefix)
+                .setTableSuffix(tableSuffix)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setMultiToOneOrigin(multiToOneOrigin)
+                .setMultiToOneTarget(multiToOneTarget)
+                .setIgnoreDefaultValue(ignoreDefaultValue)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                .setNewSchemaChange(useNewSchemaChange)
+                .create();
         databaseSync.build();
         env.execute(String.format("SqlServer-Doris Database Sync: %s", 
database));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to