This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 99f2da9  [fix]Fix multi-table sink, schema change result is incorrect 
(#313)
99f2da9 is described below

commit 99f2da9565b0f64ba56799f1b6ae89305692cbf2
Author: wudongliang <[email protected]>
AuthorDate: Thu Feb 1 16:10:09 2024 +0800

    [fix]Fix multi-table sink, schema change result is incorrect (#313)
---
 .../serializer/JsonDebeziumSchemaSerializer.java   |  23 ++-
 .../jsondebezium/JsonDebeziumChangeUtils.java      |  65 +++++++++
 .../jsondebezium/JsonDebeziumDataChange.java       |  41 +-----
 .../jsondebezium/JsonDebeziumSchemaChange.java     |  28 +---
 .../jsondebezium/JsonDebeziumSchemaChangeImpl.java |  11 +-
 .../JsonDebeziumSchemaChangeImplV2.java            |  47 +++---
 .../jsondebezium/TestJsonDebeziumDataChange.java   |  18 ++-
 .../TestJsonDebeziumSchemaChangeImplV2.java        | 158 ++++++++++++++++++---
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |   2 +
 9 files changed, 284 insertions(+), 109 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 5bea2d9..d34c1a3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
@@ -36,8 +37,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
@@ -58,7 +61,6 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private final ObjectMapper objectMapper = new ObjectMapper();
     // table name of the cdc upstream, format is db.tbl
     private final String sourceTableName;
-    private boolean firstLoad;
     private final boolean newSchemaChange;
     private String lineDelimiter = LINE_DELIMITER_DEFAULT;
     private boolean ignoreUpdateBefore = true;
@@ -71,6 +73,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private String targetTableSuffix;
     private JsonDebeziumDataChange dataChange;
     private JsonDebeziumSchemaChange schemaChange;
+    private final Set<String> initTableSet = new HashSet<>();
 
     public JsonDebeziumSchemaSerializer(
             DorisOptions dorisOptions,
@@ -85,7 +88,6 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         JsonNodeFactory jsonNodeFactory = 
JsonNodeFactory.withExactBigDecimals(true);
         this.objectMapper.setNodeFactory(jsonNodeFactory);
         this.newSchemaChange = newSchemaChange;
-        this.firstLoad = true;
     }
 
     public JsonDebeziumSchemaSerializer(
@@ -156,13 +158,24 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return null;
         }
 
-        if (firstLoad) {
-            schemaChange.init(recordRoot);
-            firstLoad = false;
+        this.tableMapping = schemaChange.getTableMapping();
+        String dorisTableName =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                        recordRoot, dorisOptions, tableMapping);
+        if (initSchemaChange(dorisTableName)) {
+            schemaChange.init(recordRoot, dorisTableName);
         }
         return dataChange.serialize(record, recordRoot, op);
     }
 
+    private boolean initSchemaChange(String dorisTableName) {
+        if (initTableSet.contains(dorisTableName)) {
+            return false;
+        }
+        initTableSet.add(dorisTableName);
+        return true;
+    }
+
     private String extractJsonNode(JsonNode record, String key) {
         return record != null && record.get(key) != null && !(record.get(key) 
instanceof NullNode)
                 ? record.get(key).asText()
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
new file mode 100644
index 0000000..921607d
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.util.Map;
+
+public class JsonDebeziumChangeUtils {
+
+    public static String getDorisTableIdentifier(
+            JsonNode record, DorisOptions dorisOptions, Map<String, String> 
tableMapping) {
+        String identifier = getCdcTableIdentifier(record);
+        return getDorisTableIdentifier(identifier, dorisOptions, tableMapping);
+    }
+
+    public static String getDorisTableIdentifier(
+            String cdcTableIdentifier,
+            DorisOptions dorisOptions,
+            Map<String, String> tableMapping) {
+        if 
(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
+            return dorisOptions.getTableIdentifier();
+        }
+        if (!CollectionUtil.isNullOrEmpty(tableMapping)
+                && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+                && tableMapping.get(cdcTableIdentifier) != null) {
+            return tableMapping.get(cdcTableIdentifier);
+        }
+        return null;
+    }
+
+    public static String getCdcTableIdentifier(JsonNode record) {
+        String db = extractJsonNode(record.get("source"), "db");
+        String schema = extractJsonNode(record.get("source"), "schema");
+        String table = extractJsonNode(record.get("source"), "table");
+        return SourceSchema.getString(db, schema, table);
+    }
+
+    public static String extractJsonNode(JsonNode record, String key) {
+        return record != null && record.get(key) != null && !(record.get(key) 
instanceof NullNode)
+                ? record.get(key).asText()
+                : null;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
index 5790c48..67aef02 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
@@ -17,19 +17,15 @@
 
 package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.sink.writer.ChangeEvent;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +52,7 @@ public class JsonDebeziumDataChange implements ChangeEvent {
     private final DorisOptions dorisOptions;
     private final boolean ignoreUpdateBefore;
     private final String lineDelimiter;
-    private JsonDebeziumChangeContext changeContext;
+    private final JsonDebeziumChangeContext changeContext;
 
     public JsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
         this.changeContext = changeContext;
@@ -68,8 +64,11 @@ public class JsonDebeziumDataChange implements ChangeEvent {
 
     public DorisRecord serialize(String record, JsonNode recordRoot, String 
op) throws IOException {
         // Filter out table records that are not in tableMapping
-        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
-        String dorisTableIdentifier = 
getDorisTableIdentifier(cdcTableIdentifier);
+        Map<String, String> tableMapping = changeContext.getTableMapping();
+        String cdcTableIdentifier = 
JsonDebeziumChangeUtils.getCdcTableIdentifier(recordRoot);
+        String dorisTableIdentifier =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                        cdcTableIdentifier, dorisOptions, tableMapping);
         if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
             LOG.warn(
                     "filter table {}, because it is not listened, record 
detail is {}",
@@ -123,34 +122,6 @@ public class JsonDebeziumDataChange implements ChangeEvent 
{
         return updateRow.toString().getBytes(StandardCharsets.UTF_8);
     }
 
-    @VisibleForTesting
-    public String getCdcTableIdentifier(JsonNode record) {
-        String db = extractJsonNode(record.get("source"), "db");
-        String schema = extractJsonNode(record.get("source"), "schema");
-        String table = extractJsonNode(record.get("source"), "table");
-        return SourceSchema.getString(db, schema, table);
-    }
-
-    @VisibleForTesting
-    public String getDorisTableIdentifier(String cdcTableIdentifier) {
-        if 
(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
-            return dorisOptions.getTableIdentifier();
-        }
-        Map<String, String> tableMapping = changeContext.getTableMapping();
-        if (!CollectionUtil.isNullOrEmpty(tableMapping)
-                && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
-                && tableMapping.get(cdcTableIdentifier) != null) {
-            return tableMapping.get(cdcTableIdentifier);
-        }
-        return null;
-    }
-
-    private String extractJsonNode(JsonNode record, String key) {
-        return record != null && record.get(key) != null && !(record.get(key) 
instanceof NullNode)
-                ? record.get(key).asText()
-                : null;
-    }
-
     private Map<String, Object> extractBeforeRow(JsonNode record) {
         return extractRow(record.get("before"));
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index 4c67164..f449857 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -19,7 +19,6 @@ package 
org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -60,7 +59,7 @@ public abstract class JsonDebeziumSchemaChange implements 
ChangeEvent {
 
     public abstract boolean schemaChange(JsonNode recordRoot);
 
-    public abstract void init(JsonNode recordRoot);
+    public abstract void init(JsonNode recordRoot, String dorisTableName);
 
     /** When cdc synchronizes multiple tables, it will capture multiple table 
schema changes. */
     protected boolean checkTable(JsonNode recordRoot) {
@@ -89,26 +88,9 @@ public abstract class JsonDebeziumSchemaChange implements 
ChangeEvent {
                 : null;
     }
 
-    @VisibleForTesting
-    public String getDorisTableIdentifier(String cdcTableIdentifier) {
-        if 
(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
-            return dorisOptions.getTableIdentifier();
-        }
-        if (!CollectionUtil.isNullOrEmpty(tableMapping)
-                && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
-                && tableMapping.get(cdcTableIdentifier) != null) {
-            return tableMapping.get(cdcTableIdentifier);
-        }
-        return null;
-    }
-
-    protected String getDorisTableIdentifier(JsonNode record) {
-        String identifier = getCdcTableIdentifier(record);
-        return getDorisTableIdentifier(identifier);
-    }
-
     protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
-        String identifier = getDorisTableIdentifier(record);
+        String identifier =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(record, 
dorisOptions, tableMapping);
         if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
             return null;
         }
@@ -136,6 +118,10 @@ public abstract class JsonDebeziumSchemaChange implements 
ChangeEvent {
         return record;
     }
 
+    public Map<String, String> getTableMapping() {
+        return tableMapping;
+    }
+
     @VisibleForTesting
     public void setSchemaChangeManager(SchemaChangeManager 
schemaChangeManager) {
         this.schemaChangeManager = schemaChangeManager;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
index 4cf0970..614f06a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
@@ -55,7 +55,7 @@ public class JsonDebeziumSchemaChangeImpl extends 
JsonDebeziumSchemaChange {
     }
 
     @Override
-    public void init(JsonNode recordRoot) {
+    public void init(JsonNode recordRoot, String dorisTableName) {
         // do nothing
     }
 
@@ -120,7 +120,14 @@ public class JsonDebeziumSchemaChangeImpl extends 
JsonDebeziumSchemaChange {
                 String col = matcher.group(3);
                 String type = matcher.group(5);
                 type = handleType(type);
-                ddl = String.format(EXECUTE_DDL, 
getDorisTableIdentifier(record), op, col, type);
+                ddl =
+                        String.format(
+                                EXECUTE_DDL,
+                                
JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                                        record, dorisOptions, tableMapping),
+                                op,
+                                col,
+                                type);
                 LOG.info("parse ddl:{}", ddl);
                 return ddl;
             }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index e5f6994..993b822 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -48,6 +48,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -66,13 +67,15 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
             Pattern.compile(
                     
"ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
                     Pattern.CASE_INSENSITIVE);
-    private Map<String, FieldSchema> originFieldSchemaMap;
+    // schemaChange saves table names, field, and field column information
+    private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new 
LinkedHashMap<>();
     private SourceConnector sourceConnector;
     // create table properties
     private final Map<String, String> tableProperties;
     private String targetDatabase;
     private String targetTablePrefix;
     private String targetTableSuffix;
+    private final Set<String> filledTables = new HashSet<>();
 
     public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext 
changeContext) {
         this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, 
Pattern.CASE_INSENSITIVE);
@@ -95,14 +98,15 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
     }
 
     @Override
-    public void init(JsonNode recordRoot) {
-        originFieldSchemaMap = new LinkedHashMap<>();
+    public void init(JsonNode recordRoot, String dorisTableName) {
         Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
         if (CollectionUtils.isEmpty(columnNameSet)) {
             columnNameSet = extractBeforeRow(recordRoot).keySet();
         }
-        columnNameSet.forEach(
-                columnName -> originFieldSchemaMap.put(columnName, new 
FieldSchema()));
+        Map<String, FieldSchema> fieldSchemaMap = new LinkedHashMap<>();
+        columnNameSet.forEach(columnName -> fieldSchemaMap.put(columnName, new 
FieldSchema()));
+
+        originFieldSchemaMap.put(dorisTableName, fieldSchemaMap);
     }
 
     @Override
@@ -168,7 +172,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
     /** Parse Alter Event. */
     @VisibleForTesting
     public List<String> extractDDLList(JsonNode record) throws IOException {
-        String dorisTable = getDorisTableIdentifier(record);
+        String dorisTable =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(record, 
dorisOptions, tableMapping);
         JsonNode historyRecord = extractHistoryRecord(record);
         String ddl = extractJsonNode(historyRecord, "ddl");
         JsonNode tableChange = extractTableChange(record);
@@ -184,16 +189,20 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
             sourceConnector =
                     SourceConnector.valueOf(
                             
record.get("source").get("connector").asText().toUpperCase());
-            fillOriginSchema(columns);
+        }
+        if (!filledTables.contains(dorisTable)) {
+            fillOriginSchema(dorisTable, columns);
+            filledTables.add(dorisTable);
         }
 
+        Map<String, FieldSchema> fieldSchemaMap = 
originFieldSchemaMap.get(dorisTable);
         // rename ddl
         Matcher renameMatcher = renameDDLPattern.matcher(ddl);
         if (renameMatcher.find()) {
             String oldColumnName = renameMatcher.group(2);
             String newColumnName = renameMatcher.group(3);
             return SchemaChangeHelper.generateRenameDDLSql(
-                    dorisTable, oldColumnName, newColumnName, 
originFieldSchemaMap);
+                    dorisTable, oldColumnName, newColumnName, fieldSchemaMap);
         }
 
         // add/drop ddl
@@ -201,7 +210,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         for (JsonNode column : columns) {
             buildFieldSchema(updateFiledSchema, column);
         }
-        SchemaChangeHelper.compareSchema(updateFiledSchema, 
originFieldSchemaMap);
+        SchemaChangeHelper.compareSchema(updateFiledSchema, fieldSchemaMap);
         // In order to avoid other source table column change operations other 
than add/drop/rename,
         // which may lead to the accidental deletion of the doris column.
         Matcher matcher = addDropDDLPattern.matcher(ddl);
@@ -330,16 +339,17 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
     }
 
     @VisibleForTesting
-    public void fillOriginSchema(JsonNode columns) {
-        if (Objects.nonNull(originFieldSchemaMap)) {
+    public void fillOriginSchema(String tableName, JsonNode columns) {
+        Map<String, FieldSchema> fieldSchemaMap = 
originFieldSchemaMap.get(tableName);
+        if (Objects.nonNull(fieldSchemaMap)) {
             for (JsonNode column : columns) {
                 String fieldName = column.get("name").asText();
-                if (originFieldSchemaMap.containsKey(fieldName)) {
+                if (fieldSchemaMap.containsKey(fieldName)) {
                     String dorisTypeName = buildDorisTypeName(column);
                     String defaultValue =
                             handleDefaultValue(extractJsonNode(column, 
"defaultValueExpression"));
                     String comment = extractJsonNode(column, "comment");
-                    FieldSchema fieldSchema = 
originFieldSchemaMap.get(fieldName);
+                    FieldSchema fieldSchema = fieldSchemaMap.get(fieldName);
                     fieldSchema.setName(fieldName);
                     fieldSchema.setTypeString(dorisTypeName);
                     fieldSchema.setComment(comment);
@@ -351,8 +361,10 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
                     "Current schema change failed! You need to ensure that "
                             + "there is data in the table."
                             + dorisOptions.getTableIdentifier());
-            originFieldSchemaMap = new LinkedHashMap<>();
-            columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, 
column));
+            fieldSchemaMap = new LinkedHashMap<>();
+            Map<String, FieldSchema> finalFieldSchemaMap = fieldSchemaMap;
+            columns.forEach(column -> buildFieldSchema(finalFieldSchemaMap, 
column));
+            originFieldSchemaMap.put(tableName, fieldSchemaMap);
         }
     }
 
@@ -403,12 +415,13 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
     }
 
     @VisibleForTesting
-    public void setOriginFieldSchemaMap(Map<String, FieldSchema> 
originFieldSchemaMap) {
+    public void setOriginFieldSchemaMap(
+            Map<String, Map<String, FieldSchema>> originFieldSchemaMap) {
         this.originFieldSchemaMap = originFieldSchemaMap;
     }
 
     @VisibleForTesting
-    public Map<String, FieldSchema> getOriginFieldSchemaMap() {
+    public Map<String, Map<String, FieldSchema>> getOriginFieldSchemaMap() {
         return originFieldSchemaMap;
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
index 8900339..4891d82 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -156,32 +156,38 @@ public class TestJsonDebeziumDataChange extends 
TestJsonDebeziumChangeBase {
         String insert =
                 
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"t
 [...]
         JsonNode recordRoot = objectMapper.readTree(insert);
-        String identifier = dataChange.getCdcTableIdentifier(recordRoot);
+        String identifier = 
JsonDebeziumChangeUtils.getCdcTableIdentifier(recordRoot);
         Assert.assertEquals("test.t1", identifier);
 
         String insertSchema =
                 
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"
 [...]
         String identifierSchema =
-                
dataChange.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+                
JsonDebeziumChangeUtils.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
         Assert.assertEquals("test.dbo.t1", identifierSchema);
 
         String ddl =
                 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
 [...]
-        String ddlRes = 
dataChange.getCdcTableIdentifier(objectMapper.readTree(ddl));
+        String ddlRes = 
JsonDebeziumChangeUtils.getCdcTableIdentifier(objectMapper.readTree(ddl));
         Assert.assertEquals("test.t1", ddlRes);
     }
 
     @Test
     public void testGetDorisTableIdentifier() throws Exception {
-        String identifier = dataChange.getDorisTableIdentifier("test.dbo.t1");
+        String identifier =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                        "test.dbo.t1", dorisOptions, tableMapping);
         Assert.assertEquals("test.t1", identifier);
 
-        identifier = dataChange.getDorisTableIdentifier("test.t1");
+        identifier =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                        "test.t1", dorisOptions, tableMapping);
         Assert.assertEquals("test.t1", identifier);
 
         String tmp = dorisOptions.getTableIdentifier();
         dorisOptions.setTableIdentifier(null);
-        identifier = dataChange.getDorisTableIdentifier("test.t1");
+        identifier =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                        "test.t1", dorisOptions, tableMapping);
         Assert.assertNull(identifier);
         dorisOptions.setTableIdentifier(tmp);
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 8aca521..4801e88 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -73,16 +73,18 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         String sql3 = "ALTER TABLE `test`.`t1` DROP COLUMN `c13`";
         List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);
 
-        Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
-        originFiledSchemaMap.put("c2", new FieldSchema());
-        originFiledSchemaMap.put("c555", new FieldSchema());
-        originFiledSchemaMap.put("c666", new FieldSchema());
-        originFiledSchemaMap.put("c4", new FieldSchema());
-        originFiledSchemaMap.put("c13", new FieldSchema());
-
-        String record =
+        Map<String, Map<String, FieldSchema>> originFiledSchemaMap = new 
LinkedHashMap<>();
+        Map<String, FieldSchema> filedSchemaMap = new LinkedHashMap<>();
+        filedSchemaMap.put("c2", new FieldSchema());
+        filedSchemaMap.put("c555", new FieldSchema());
+        filedSchemaMap.put("c666", new FieldSchema());
+        filedSchemaMap.put("c4", new FieldSchema());
+        filedSchemaMap.put("c13", new FieldSchema());
+        originFiledSchemaMap.put("test.t1", filedSchemaMap);
+
+        String record1 =
                 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
 [...]
-        JsonNode recordRoot = objectMapper.readTree(record);
+        JsonNode recordRoot = objectMapper.readTree(record1);
         schemaChange.setOriginFieldSchemaMap(originFiledSchemaMap);
         List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
         for (int i = 0; i < ddlSQLList.size(); i++) {
@@ -157,15 +159,18 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
                 "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", 
null, null));
         srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null));
 
+        String tableName = "db.test_fill";
         schemaChange.setSourceConnector("mysql");
         String columnsString =
                 
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"
 [...]
         JsonNode columns = objectMapper.readTree(columnsString);
-        schemaChange.fillOriginSchema(columns);
-        Map<String, FieldSchema> originFieldSchemaMap = 
schemaChange.getOriginFieldSchemaMap();
+        schemaChange.fillOriginSchema(tableName, columns);
+        Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+                schemaChange.getOriginFieldSchemaMap();
+        Map<String, FieldSchema> fieldSchemaMap = 
originFieldSchemaMap.get(tableName);
 
         Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
-                originFieldSchemaMap.entrySet().iterator();
+                fieldSchemaMap.entrySet().iterator();
         for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
             FieldSchema srcFiledSchema = entry.getValue();
             Entry<String, FieldSchema> originField = 
originFieldSchemaIterator.next();
@@ -180,6 +185,102 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         }
     }
 
+    @Test
+    public void testMultipleFillOriginSchema() throws IOException {
+        Map<String, Map<String, FieldSchema>> originFiledSchema = 
buildOriginFiledSchema();
+        String tableName1 = "db.test_fill";
+        String tableName2 = "test.t1";
+
+        schemaChange.setOriginFieldSchemaMap(originFiledSchema);
+        schemaChange.setSourceConnector("mysql");
+        String columnsString1 =
+                
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"
 [...]
+        JsonNode columns1 = objectMapper.readTree(columnsString1);
+
+        String columnsString2 =
+                
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10000\",\"enumValues\":[]},{\"name\":\"c2\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":nul
 [...]
+        JsonNode columns2 = objectMapper.readTree(columnsString2);
+
+        schemaChange.fillOriginSchema(tableName1, columns1);
+        schemaChange.fillOriginSchema(tableName2, columns2);
+        Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+                schemaChange.getOriginFieldSchemaMap();
+
+        Map<String, FieldSchema> originSchema1 = 
originFieldSchemaMap.get(tableName1);
+        Map<String, FieldSchema> originSchema2 = 
originFieldSchemaMap.get(tableName2);
+
+        Map<String, Map<String, FieldSchema>> scrFiledSchema = 
buildSrcFiledSchema();
+        Map<String, FieldSchema> scrSchema1 = scrFiledSchema.get(tableName1);
+        Map<String, FieldSchema> scrSchema2 = scrFiledSchema.get(tableName2);
+
+        compareResults(originSchema1, scrSchema1);
+        compareResults(originSchema2, scrSchema2);
+    }
+
+    private void compareResults(
+            Map<String, FieldSchema> originSchema, Map<String, FieldSchema> 
scrSchema) {
+        Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
+                originSchema.entrySet().iterator();
+        for (Entry<String, FieldSchema> srcEntry : scrSchema.entrySet()) {
+            FieldSchema srcFiledSchema = srcEntry.getValue();
+            Entry<String, FieldSchema> originField = 
originFieldSchemaIterator.next();
+
+            Assert.assertEquals(srcEntry.getKey(), originField.getKey());
+            Assert.assertEquals(srcFiledSchema.getName(), 
originField.getValue().getName());
+            Assert.assertEquals(
+                    srcFiledSchema.getTypeString(), 
originField.getValue().getTypeString());
+            Assert.assertEquals(
+                    srcFiledSchema.getDefaultValue(), 
originField.getValue().getDefaultValue());
+            Assert.assertEquals(srcFiledSchema.getComment(), 
originField.getValue().getComment());
+        }
+    }
+
+    private Map<String, Map<String, FieldSchema>> buildSrcFiledSchema() {
+        String tab1 = "db.test_fill";
+        String tab2 = "test.t1";
+        Map<String, Map<String, FieldSchema>> scrFiledSchema = new 
LinkedHashMap<>();
+        Map<String, FieldSchema> filedSchemaMap1 = new LinkedHashMap<>();
+        filedSchemaMap1.put("id", new FieldSchema("id", "INT", null, null));
+        filedSchemaMap1.put("name", new FieldSchema("name", "VARCHAR(150)", 
null, null));
+        filedSchemaMap1.put("test_time", new FieldSchema("test_time", 
"DATETIMEV2(0)", null, null));
+        filedSchemaMap1.put("c1", new FieldSchema("c1", "INT", "100", null));
+        scrFiledSchema.put(tab1, filedSchemaMap1);
+
+        Map<String, FieldSchema> filedSchemaMap2 = new LinkedHashMap<>();
+        filedSchemaMap2.put("id", new FieldSchema("id", "INT", "10000", null));
+        filedSchemaMap2.put("c2", new FieldSchema("c2", "INT", null, null));
+        filedSchemaMap2.put("c555", new FieldSchema("c555", "VARCHAR(300)", 
null, null));
+        filedSchemaMap2.put("c666", new FieldSchema("c666", "INT", "100", 
null));
+        filedSchemaMap2.put("c4", new FieldSchema("c4", "BIGINT", "555", 
null));
+        filedSchemaMap2.put("c199", new FieldSchema("c199", "INT", null, 
null));
+        filedSchemaMap2.put("c12", new FieldSchema("c12", "INT", "100", null));
+        scrFiledSchema.put(tab2, filedSchemaMap2);
+        return scrFiledSchema;
+    }
+
+    private Map<String, Map<String, FieldSchema>> buildOriginFiledSchema() {
+        String tab1 = "db.test_fill";
+        String tab2 = "test.t1";
+        Map<String, Map<String, FieldSchema>> originFiledSchema = new 
LinkedHashMap<>();
+        Map<String, FieldSchema> filedSchemaMap1 = new LinkedHashMap<>();
+        filedSchemaMap1.put("id", new FieldSchema());
+        filedSchemaMap1.put("name", new FieldSchema());
+        filedSchemaMap1.put("test_time", new FieldSchema());
+        filedSchemaMap1.put("c1", new FieldSchema());
+        originFiledSchema.put(tab1, filedSchemaMap1);
+
+        Map<String, FieldSchema> filedSchemaMap2 = new LinkedHashMap<>();
+        filedSchemaMap2.put("id", new FieldSchema());
+        filedSchemaMap2.put("c2", new FieldSchema());
+        filedSchemaMap2.put("c555", new FieldSchema());
+        filedSchemaMap2.put("c666", new FieldSchema());
+        filedSchemaMap2.put("c4", new FieldSchema());
+        filedSchemaMap2.put("c199", new FieldSchema());
+        filedSchemaMap2.put("c12", new FieldSchema());
+        originFiledSchema.put(tab2, filedSchemaMap2);
+        return originFiledSchema;
+    }
+
     @Test
     public void testBuildMysql2DorisTypeName() throws IOException {
         String columnInfo =
@@ -204,14 +305,16 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
     public void testExtractDDLListRename() throws IOException {
         String columnInfo =
                 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\
 [...]
-        Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap();
+        Map<String, Map<String, FieldSchema>> originFieldSchemaHashMap = new 
LinkedHashMap<>();
+        Map<String, FieldSchema> fieldSchemaHashMap = Maps.newHashMap();
         JsonNode record = objectMapper.readTree(columnInfo);
         schemaChange.setSourceConnector("mysql");
 
-        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
-        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
-        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", 
"", ""));
-        schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap);
+        fieldSchemaHashMap.put("id", new FieldSchema("id", "INT", "", ""));
+        fieldSchemaHashMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+        fieldSchemaHashMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", 
""));
+        originFieldSchemaHashMap.put("test.t1", fieldSchemaHashMap);
+        schemaChange.setOriginFieldSchemaMap(originFieldSchemaHashMap);
 
         List<String> ddlList = schemaChange.extractDDLList(record);
         Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c3` 
`c333`", ddlList.get(0));
@@ -239,15 +342,21 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
 
     @Test
     public void testGetDorisTableIdentifier() throws Exception {
-        String identifier = 
schemaChange.getDorisTableIdentifier("test.dbo.t1");
+        String identifier =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                        "test.dbo.t1", dorisOptions, tableMapping);
         Assert.assertEquals("test.t1", identifier);
 
-        identifier = schemaChange.getDorisTableIdentifier("test.t1");
+        identifier =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                        "test.t1", dorisOptions, tableMapping);
         Assert.assertEquals("test.t1", identifier);
 
         String tmp = dorisOptions.getTableIdentifier();
         dorisOptions.setTableIdentifier(null);
-        identifier = schemaChange.getDorisTableIdentifier("test.t1");
+        identifier =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(
+                        "test.t1", dorisOptions, tableMapping);
         Assert.assertNull(identifier);
         dorisOptions.setTableIdentifier(tmp);
     }
@@ -293,15 +402,18 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
                 "test_ts_6",
                 new FieldSchema("test_ts_6", "DATETIMEV2(6)", 
"current_timestamp", null));
 
+        String tableName = "db.test_fill";
         schemaChange.setSourceConnector("mysql");
         String columnsString =
                 
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValu
 [...]
         JsonNode columns = objectMapper.readTree(columnsString);
-        schemaChange.fillOriginSchema(columns);
-        Map<String, FieldSchema> originFieldSchemaMap = 
schemaChange.getOriginFieldSchemaMap();
+        schemaChange.fillOriginSchema(tableName, columns);
+        Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+                schemaChange.getOriginFieldSchemaMap();
+        Map<String, FieldSchema> fieldSchemaMap = 
originFieldSchemaMap.get(tableName);
 
         Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
-                originFieldSchemaMap.entrySet().iterator();
+                fieldSchemaMap.entrySet().iterator();
         for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
             FieldSchema srcFiledSchema = entry.getValue();
             Entry<String, FieldSchema> originField = 
originFieldSchemaIterator.next();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 3cc9db8..88176e8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -72,6 +72,7 @@ public class CdcMysqlSyncDatabaseCase {
         String multiToOneTarget = "a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
+        boolean singleSink = false;
         DatabaseSync databaseSync = new MysqlDatabaseSync();
         databaseSync
                 .setEnv(env)
@@ -88,6 +89,7 @@ public class CdcMysqlSyncDatabaseCase {
                 .setTableConfig(tableConfig)
                 .setCreateTableOnly(false)
                 .setNewSchemaChange(useNewSchemaChange)
+                .setSingleSink(singleSink)
                 .create();
         databaseSync.build();
         env.execute(String.format("MySQL-Doris Database Sync: %s", database));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to