This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new db76d2a3 [improve]The sql_parser model of schema change support 
automatic table creation (#435)
db76d2a3 is described below

commit db76d2a38d73d5c3f3dc2912aed4b270467fb074
Author: wudongliang <[email protected]>
AuthorDate: Tue Jul 23 11:30:54 2024 +0800

    [improve]The sql_parser model of schema change support automatic table 
creation (#435)
---
 .../doris/flink/catalog/doris/FieldSchema.java     |  18 +++
 .../doris/flink/catalog/doris/TableSchema.java     |  27 ++++
 .../flink/sink/schema/SQLParserSchemaManager.java  | 155 +++++++++++++++++++--
 .../jsondebezium/JsonDebeziumChangeUtils.java      |  37 +++++
 .../jsondebezium/JsonDebeziumSchemaChange.java     |   8 ++
 .../JsonDebeziumSchemaChangeImplV2.java            |  48 +------
 .../jsondebezium/SQLParserSchemaChange.java        |  32 ++++-
 .../sink/schema/SQLParserSchemaManagerTest.java    | 138 ++++++++++++++++++
 .../jsondebezium/TestJsonDebeziumChangeUtils.java  |  43 ++++++
 .../TestJsonDebeziumSchemaChangeImplV2.java        |  13 --
 .../jsondebezium/TestSQLParserSchemaChange.java    |  42 +++++-
 .../doris/flink/tools/cdc/MySQLDorisE2ECase.java   | 154 ++++++++++++++++++++
 12 files changed, 641 insertions(+), 74 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
index 3d7f2765..a8d85e1f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
@@ -69,4 +69,22 @@ public class FieldSchema {
     public void setDefaultValue(String defaultValue) {
         this.defaultValue = defaultValue;
     }
+
+    @Override
+    public String toString() {
+        return "FieldSchema{"
+                + "name='"
+                + name
+                + '\''
+                + ", typeString='"
+                + typeString
+                + '\''
+                + ", defaultValue='"
+                + defaultValue
+                + '\''
+                + ", comment='"
+                + comment
+                + '\''
+                + '}';
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
index 4cc9098f..3a47a044 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -106,4 +106,31 @@ public class TableSchema {
     public Integer getTableBuckets() {
         return tableBuckets;
     }
+
+    @Override
+    public String toString() {
+        return "TableSchema{"
+                + "database='"
+                + database
+                + '\''
+                + ", table='"
+                + table
+                + '\''
+                + ", tableComment='"
+                + tableComment
+                + '\''
+                + ", fields="
+                + fields
+                + ", keys="
+                + String.join(",", keys)
+                + ", model="
+                + model.name()
+                + ", distributeKeys="
+                + String.join(",", distributeKeys)
+                + ", properties="
+                + properties
+                + ", tableBuckets="
+                + tableBuckets
+                + '}';
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
index 6f157cdc..5acedfc2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -18,6 +18,7 @@
 package org.apache.doris.flink.sink.schema;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
 
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
@@ -27,22 +28,32 @@ import net.sf.jsqlparser.statement.alter.AlterExpression;
 import net.sf.jsqlparser.statement.alter.AlterExpression.ColumnDataType;
 import net.sf.jsqlparser.statement.alter.AlterOperation;
 import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import net.sf.jsqlparser.statement.create.table.Index;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
-/** Use {@link net.sf.jsqlparser.parser.CCJSqlParserUtil} to parse SQL 
statements. */
+/** Use {@link CCJSqlParserUtil} to parse SQL statements. */
 public class SQLParserSchemaManager implements Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(SQLParserSchemaManager.class);
     private static final String DEFAULT = "DEFAULT";
     private static final String COMMENT = "COMMENT";
+    private static final String PRIMARY = "PRIMARY";
+    private static final String PRIMARY_KEY = "PRIMARY KEY";
+    private static final String UNIQUE = "UNIQUE";
 
     /**
      * Doris' schema change only supports ADD, DROP, and RENAME operations. 
This method is only used
@@ -96,6 +107,128 @@ public class SQLParserSchemaManager implements 
Serializable {
         return ddlList;
     }
 
+    public TableSchema parseCreateTableStatement(
+            SourceConnector sourceConnector,
+            String ddl,
+            String dorisTable,
+            Map<String, String> tableProperties) {
+        try {
+            Statement statement = CCJSqlParserUtil.parse(ddl);
+            if (statement instanceof CreateTable) {
+                CreateTable createTable = (CreateTable) statement;
+                Map<String, FieldSchema> columnFields = new LinkedHashMap<>();
+                List<String> pkKeys = new ArrayList<>();
+                createTable
+                        .getColumnDefinitions()
+                        .forEach(
+                                column -> {
+                                    String columnName = column.getColumnName();
+                                    ColDataType colDataType = 
column.getColDataType();
+                                    String dataType = 
parseDataType(colDataType, sourceConnector);
+                                    List<String> columnSpecs = 
column.getColumnSpecs();
+                                    String defaultValue = 
extractDefaultValue(columnSpecs);
+                                    String comment = 
extractComment(columnSpecs);
+                                    FieldSchema fieldSchema =
+                                            new FieldSchema(
+                                                    columnName, dataType, 
defaultValue, comment);
+                                    columnFields.put(columnName, fieldSchema);
+                                    extractColumnPrimaryKey(columnName, 
columnSpecs, pkKeys);
+                                });
+
+                List<Index> indexes = createTable.getIndexes();
+                extractIndexesPrimaryKey(indexes, pkKeys);
+
+                String[] dbTable = dorisTable.split("\\.");
+                Preconditions.checkArgument(dbTable.length == 2);
+                TableSchema tableSchema = new TableSchema();
+                tableSchema.setDatabase(dbTable[0]);
+                tableSchema.setTable(dbTable[1]);
+                tableSchema.setModel(pkKeys.isEmpty() ? DataModel.DUPLICATE : 
DataModel.UNIQUE);
+                tableSchema.setFields(columnFields);
+                tableSchema.setKeys(pkKeys);
+                tableSchema.setTableComment(
+                        
extractTableComment(createTable.getTableOptionsStrings()));
+                tableSchema.setDistributeKeys(
+                        JsonDebeziumChangeUtils.buildDistributeKeys(pkKeys, 
columnFields));
+                tableSchema.setProperties(tableProperties);
+                if (tableProperties.containsKey("table-buckets")) {
+                    String tableBucketsConfig = 
tableProperties.get("table-buckets");
+                    Map<String, Integer> tableBuckets =
+                            DatabaseSync.getTableBuckets(tableBucketsConfig);
+                    Integer buckets =
+                            JsonDebeziumChangeUtils.getTableSchemaBuckets(
+                                    tableBuckets, tableSchema.getTable());
+                    tableSchema.setTableBuckets(buckets);
+                }
+                return tableSchema;
+            } else {
+                LOG.warn(
+                        "Unsupported statement type. ddl={}, 
sourceConnector={}, dorisTable={}",
+                        ddl,
+                        sourceConnector.getConnectorName(),
+                        dorisTable);
+            }
+        } catch (JSQLParserException e) {
+            LOG.warn(
+                    "Failed to parse create table statement. ddl={}, 
sourceConnector={}, dorisTable={}",
+                    ddl,
+                    sourceConnector.getConnectorName(),
+                    dorisTable);
+        }
+        return null;
+    }
+
+    private void extractIndexesPrimaryKey(List<Index> indexes, List<String> 
pkKeys) {
+        if (CollectionUtils.isEmpty(indexes)) {
+            return;
+        }
+        indexes.stream()
+                .filter(
+                        index ->
+                                PRIMARY_KEY.equalsIgnoreCase(index.getType())
+                                        || 
UNIQUE.equalsIgnoreCase(index.getType()))
+                .flatMap(index -> index.getColumnsNames().stream())
+                .distinct()
+                .filter(
+                        primaryKey ->
+                                pkKeys.stream()
+                                        .noneMatch(pkKey -> 
pkKey.equalsIgnoreCase(primaryKey)))
+                .forEach(pkKeys::add);
+    }
+
+    private void extractColumnPrimaryKey(
+            String columnName, List<String> columnSpecs, List<String> pkKeys) {
+        if (CollectionUtils.isEmpty(columnSpecs)) {
+            return;
+        }
+        for (String columnSpec : columnSpecs) {
+            if (PRIMARY.equalsIgnoreCase(columnSpec)) {
+                pkKeys.add(columnName);
+            }
+        }
+    }
+
+    private String extractTableComment(List<String> tableOptionsStrings) {
+        if (CollectionUtils.isEmpty(tableOptionsStrings)) {
+            return null;
+        }
+        return extractAdjacentString(tableOptionsStrings, COMMENT);
+    }
+
+    private String parseDataType(ColDataType colDataType, SourceConnector 
sourceConnector) {
+        String dataType = colDataType.getDataType();
+        int length = 0;
+        int scale = 0;
+        if (CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) {
+            List<String> argumentsStringList = 
colDataType.getArgumentsStringList();
+            length = Integer.parseInt(argumentsStringList.get(0));
+            if (argumentsStringList.size() == 2) {
+                scale = Integer.parseInt(argumentsStringList.get(1));
+            }
+        }
+        return JsonDebeziumChangeUtils.buildDorisTypeName(sourceConnector, 
dataType, length, scale);
+    }
+
     private String processDropColumnOperation(AlterExpression alterExpression, 
String dorisTable) {
         String dropColumnDDL =
                 SchemaChangeHelper.buildDropColumnDDL(dorisTable, 
alterExpression.getColumnName());
@@ -110,19 +243,7 @@ public class SQLParserSchemaManager implements 
Serializable {
         for (ColumnDataType columnDataType : colDataTypeList) {
             String columnName = columnDataType.getColumnName();
             ColDataType colDataType = columnDataType.getColDataType();
-            String datatype = colDataType.getDataType();
-            Integer length = null;
-            Integer scale = null;
-            if 
(CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) {
-                List<String> argumentsStringList = 
colDataType.getArgumentsStringList();
-                length = Integer.parseInt(argumentsStringList.get(0));
-                if (argumentsStringList.size() == 2) {
-                    scale = Integer.parseInt(argumentsStringList.get(1));
-                }
-            }
-            datatype =
-                    JsonDebeziumChangeUtils.buildDorisTypeName(
-                            sourceConnector, datatype, length, scale);
+            String datatype = parseDataType(colDataType, sourceConnector);
 
             List<String> columnSpecs = columnDataType.getColumnSpecs();
             String defaultValue = extractDefaultValue(columnSpecs);
@@ -161,6 +282,9 @@ public class SQLParserSchemaManager implements Serializable 
{
 
     @VisibleForTesting
     public String extractDefaultValue(List<String> columnSpecs) {
+        if (CollectionUtils.isEmpty(columnSpecs)) {
+            return null;
+        }
         return extractAdjacentString(columnSpecs, DEFAULT);
     }
 
@@ -185,6 +309,9 @@ public class SQLParserSchemaManager implements Serializable 
{
 
     @VisibleForTesting
     public String extractComment(List<String> columnSpecs) {
+        if (CollectionUtils.isEmpty(columnSpecs)) {
+            return null;
+        }
         return extractAdjacentString(columnSpecs, COMMENT);
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
index 36acecd4..0f176914 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -30,7 +31,12 @@ import org.apache.doris.flink.tools.cdc.oracle.OracleType;
 import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
 import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Pattern;
 
 import static org.apache.doris.flink.tools.cdc.SourceConnector.MYSQL;
 import static org.apache.doris.flink.tools.cdc.SourceConnector.ORACLE;
@@ -95,4 +101,35 @@ public class JsonDebeziumChangeUtils {
         }
         return dorisTypeName;
     }
+
+    public static List<String> buildDistributeKeys(
+            List<String> primaryKeys, Map<String, FieldSchema> fields) {
+        if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
+            return primaryKeys;
+        }
+        if (!fields.isEmpty()) {
+            Entry<String, FieldSchema> firstField = 
fields.entrySet().iterator().next();
+            return Collections.singletonList(firstField.getKey());
+        }
+        return new ArrayList<>();
+    }
+
+    public static Integer getTableSchemaBuckets(
+            Map<String, Integer> tableBucketsMap, String tableName) {
+        if (tableBucketsMap != null) {
+            // Firstly, if the table name is in the table-buckets map, set the 
buckets of the table.
+            if (tableBucketsMap.containsKey(tableName)) {
+                return tableBucketsMap.get(tableName);
+            }
+            // Secondly, iterate over the map to find a corresponding regular 
expression match,
+            for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
+
+                Pattern pattern = Pattern.compile(entry.getKey());
+                if (pattern.matcher(tableName).matches()) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return null;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index a2164b72..15817da6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -66,6 +66,9 @@ public abstract class JsonDebeziumSchemaChange extends 
CdcSchemaChange {
     protected SchemaChangeManager schemaChangeManager;
     protected JsonDebeziumChangeContext changeContext;
     protected SourceConnector sourceConnector;
+    protected String targetDatabase;
+    protected String targetTablePrefix;
+    protected String targetTableSuffix;
 
     public abstract boolean schemaChange(JsonNode recordRoot);
 
@@ -189,6 +192,11 @@ public abstract class JsonDebeziumSchemaChange extends 
CdcSchemaChange {
         }
     }
 
+    protected String getCreateTableIdentifier(JsonNode record) {
+        String table = extractJsonNode(record.get("source"), "table");
+        return targetDatabase + "." + targetTablePrefix + table + 
targetTableSuffix;
+    }
+
     public Map<String, String> getTableMapping() {
         return tableMapping;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 7ef975e2..f44602c9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -19,7 +19,6 @@ package 
org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -45,13 +44,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -73,9 +70,6 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
     private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new 
LinkedHashMap<>();
     // create table properties
     private final Map<String, String> tableProperties;
-    private final String targetDatabase;
-    private final String targetTablePrefix;
-    private final String targetTableSuffix;
     private final Set<String> filledTables = new HashSet<>();
 
     public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext 
changeContext) {
@@ -240,7 +234,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         TableSchema tableSchema = new TableSchema();
         tableSchema.setFields(field);
         tableSchema.setKeys(pkList);
-        tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
+        
tableSchema.setDistributeKeys(JsonDebeziumChangeUtils.buildDistributeKeys(pkList,
 field));
         tableSchema.setTableComment(tblComment);
         tableSchema.setProperties(tableProperties);
         tableSchema.setModel(pkList.isEmpty() ? DataModel.DUPLICATE : 
DataModel.UNIQUE);
@@ -252,48 +246,14 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         if (tableProperties.containsKey("table-buckets")) {
             String tableBucketsConfig = tableProperties.get("table-buckets");
             Map<String, Integer> tableBuckets = 
DatabaseSync.getTableBuckets(tableBucketsConfig);
-            Integer buckets = getTableSchemaBuckets(tableBuckets, 
tableSchema.getTable());
+            Integer buckets =
+                    JsonDebeziumChangeUtils.getTableSchemaBuckets(
+                            tableBuckets, tableSchema.getTable());
             tableSchema.setTableBuckets(buckets);
         }
         return tableSchema;
     }
 
-    @VisibleForTesting
-    public Integer getTableSchemaBuckets(Map<String, Integer> tableBucketsMap, 
String tableName) {
-        if (tableBucketsMap != null) {
-            // Firstly, if the table name is in the table-buckets map, set the 
buckets of the table.
-            if (tableBucketsMap.containsKey(tableName)) {
-                return tableBucketsMap.get(tableName);
-            }
-            // Secondly, iterate over the map to find a corresponding regular 
expression match,
-            for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
-
-                Pattern pattern = Pattern.compile(entry.getKey());
-                if (pattern.matcher(tableName).matches()) {
-                    return entry.getValue();
-                }
-            }
-        }
-        return null;
-    }
-
-    private List<String> buildDistributeKeys(
-            List<String> primaryKeys, Map<String, FieldSchema> fields) {
-        if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
-            return primaryKeys;
-        }
-        if (!fields.isEmpty()) {
-            Entry<String, FieldSchema> firstField = 
fields.entrySet().iterator().next();
-            return Collections.singletonList(firstField.getKey());
-        }
-        return new ArrayList<>();
-    }
-
-    private String getCreateTableIdentifier(JsonNode record) {
-        String table = extractJsonNode(record.get("source"), "table");
-        return targetDatabase + "." + targetTablePrefix + table + 
targetTableSuffix;
-    }
-
     private boolean checkSchemaChange(String database, String table, DDLSchema 
ddlSchema)
             throws IOException, IllegalArgumentException {
         Map<String, Object> param =
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index 6be3f72c..7e44acc6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.sink.schema.SQLParserSchemaManager;
 import org.apache.doris.flink.sink.schema.SchemaChangeManager;
 import org.apache.doris.flink.sink.writer.EventType;
@@ -42,6 +43,15 @@ public class SQLParserSchemaChange extends 
JsonDebeziumSchemaChange {
         this.sqlParserSchemaManager = new SQLParserSchemaManager();
         this.tableMapping = changeContext.getTableMapping();
         this.objectMapper = changeContext.getObjectMapper();
+        this.targetDatabase = changeContext.getTargetDatabase();
+        this.targetTablePrefix =
+                changeContext.getTargetTablePrefix() == null
+                        ? ""
+                        : changeContext.getTargetTablePrefix();
+        this.targetTableSuffix =
+                changeContext.getTargetTableSuffix() == null
+                        ? ""
+                        : changeContext.getTargetTableSuffix();
     }
 
     @Override
@@ -64,8 +74,16 @@ public class SQLParserSchemaChange extends 
JsonDebeziumSchemaChange {
             }
 
             if (eventType.equals(EventType.CREATE)) {
-                // TODO support auto create table
-                LOG.warn("Not auto support create table. recordRoot={}", 
recordRoot);
+                String dorisTable = getCreateTableIdentifier(recordRoot);
+                TableSchema tableSchema = 
tryParseCreateTableStatement(recordRoot, dorisTable);
+                status = schemaChangeManager.createTable(tableSchema);
+                if (status) {
+                    String cdcTbl = getCdcTableIdentifier(recordRoot);
+                    String dorisTbl = getCreateTableIdentifier(recordRoot);
+                    changeContext.getTableMapping().put(cdcTbl, dorisTbl);
+                    this.tableMapping = changeContext.getTableMapping();
+                    LOG.info("create table ddl status: {}", status);
+                }
             } else if (eventType.equals(EventType.ALTER)) {
                 Tuple2<String, String> dorisTableTuple = 
getDorisTableTuple(recordRoot);
                 if (dorisTableTuple == null) {
@@ -81,6 +99,16 @@ public class SQLParserSchemaChange extends 
JsonDebeziumSchemaChange {
         return status;
     }
 
+    @VisibleForTesting
+    public TableSchema tryParseCreateTableStatement(JsonNode record, String 
dorisTable)
+            throws IOException {
+        JsonNode historyRecord = extractHistoryRecord(record);
+        String ddl = extractJsonNode(historyRecord, "ddl");
+        extractSourceConnector(record);
+        return sqlParserSchemaManager.parseCreateTableStatement(
+                sourceConnector, ddl, dorisTable, 
changeContext.getTableProperties());
+    }
+
     @VisibleForTesting
     public List<String> tryParserAlterDDLs(JsonNode record) throws IOException 
{
         String dorisTable =
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
index cbe3f08a..941bca46 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.schema;
 
+import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.junit.Assert;
 import org.junit.Before;
@@ -25,6 +26,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 public class SQLParserSchemaManagerTest {
@@ -203,4 +205,140 @@ public class SQLParserSchemaManagerTest {
         Assert.assertEquals("a\"bc\"d", 
schemaManager.removeContinuousChar("\"a\"bc\"d\"", '\"'));
         Assert.assertEquals("abc", schemaManager.removeContinuousChar("'abc'", 
'\''));
     }
+
+    @Test
+    public void testParseCreateTableStatement() {
+        String dorisTable = "doris.auto_tab";
+        String ddl =
+                "CREATE TABLE `test_sinka` (\n"
+                        + "  `id` int NOT NULL DEFAULT '10000' COMMENT 
'id_test',\n"
+                        + "  `create_time` datetime(3) DEFAULT 
CURRENT_TIMESTAMP(3),\n"
+                        + "  `c1` int DEFAULT '999',\n"
+                        + "  `decimal_type` decimal(9,3) DEFAULT '1.000' 
COMMENT 'decimal_tes',\n"
+                        + "  `aaa` varchar(100) DEFAULT NULL,\n"
+                        + "  `decimal_type3` decimal(38,9) DEFAULT 
'1.123456789' COMMENT 'comment_test',\n"
+                        + "  `create_time3` datetime(3) DEFAULT 
CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n"
+                        + "  PRIMARY KEY (`id`)\n"
+                        + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_0900_ai_ci";
+        TableSchema tableSchema =
+                schemaManager.parseCreateTableStatement(
+                        SourceConnector.MYSQL, ddl, dorisTable, new 
HashMap<>());
+
+        String expected =
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', 
defaultValue='10000', comment='id_test'}, 
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', 
defaultValue='CURRENT_TIMESTAMP', comment='null'}, 
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', 
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', 
typeString='DECIMALV3(9,3)', defaultValue='1.0 [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
+
+    @Test
+    public void testParseCreateUniqueTableStatement() {
+        String dorisTable = "doris.auto_uni_tab";
+        String ddl =
+                "CREATE TABLE test_sink_unique (     id INT NOT NULL,     name 
VARCHAR(100) NOT NULL,     age INT,     email VARCHAR(100),     UNIQUE (email) 
)";
+        TableSchema tableSchema =
+                schemaManager.parseCreateTableStatement(
+                        SourceConnector.MYSQL, ddl, dorisTable, new 
HashMap<>());
+
+        String expected =
+                "TableSchema{database='doris', table='auto_uni_tab', 
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', 
defaultValue='null', comment='null'}, name=FieldSchema{name='name', 
typeString='VARCHAR(300)', defaultValue='null', comment='null'}, 
age=FieldSchema{name='age', typeString='INT', defaultValue='null', 
comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', 
defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distribut [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
+
+    @Test
+    public void testParseCreateDuplicateTableStatement() {
+        String dorisTable = "doris.auto_duptab";
+        String ddl =
+                "CREATE TABLE test_sink_duplicate (\n"
+                        + "    id INT,\n"
+                        + "    name VARCHAR(50),\n"
+                        + "    age INT,\n"
+                        + "    address VARCHAR(255)\n"
+                        + ")";
+        TableSchema tableSchema =
+                schemaManager.parseCreateTableStatement(
+                        SourceConnector.MYSQL, ddl, dorisTable, new 
HashMap<>());
+
+        String expected =
+                "TableSchema{database='doris', table='auto_duptab', 
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', 
defaultValue='null', comment='null'}, name=FieldSchema{name='name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
age=FieldSchema{name='age', typeString='INT', defaultValue='null', 
comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', 
defaultValue='null', comment='null'}}, keys=, model=DUPLICATE, distribu [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
+
+    @Test
+    public void testParseOracleTableStatement() {
+        String dorisTable = "doris.auto_tab";
+        String ddl =
+                "CREATE TABLE employees (\n"
+                        + "    employee_id NUMBER(10) NOT NULL,\n"
+                        + "    first_name VARCHAR2(50),\n"
+                        + "    last_name VARCHAR2(50) NOT NULL,\n"
+                        + "    email VARCHAR2(100) UNIQUE,\n"
+                        + "    phone_number VARCHAR2(20),\n"
+                        + "    hire_date DATE DEFAULT SYSDATE NOT NULL,\n"
+                        + "    job_id VARCHAR2(10) NOT NULL,\n"
+                        + "    salary NUMBER(8, 2),\n"
+                        + "    commission_pct NUMBER(2, 2),\n"
+                        + "    manager_id NUMBER(10),\n"
+                        + "    department_id NUMBER(10),\n"
+                        + "    CONSTRAINT pk_employee PRIMARY KEY 
(employee_id),\n"
+                        + "    CONSTRAINT fk_department FOREIGN KEY 
(department_id)\n"
+                        + "        REFERENCES departments(department_id)\n"
+                        + ");";
+        TableSchema tableSchema =
+                schemaManager.parseCreateTableStatement(
+                        SourceConnector.ORACLE, ddl, dorisTable, new 
HashMap<>());
+
+        String expected =
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={employee_id=FieldSchema{name='employee_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', 
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', 
com [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
+
+    @Test
+    public void testParseOraclePrimaryTableStatement() {
+        String dorisTable = "doris.auto_tab";
+        String ddl =
+                "CREATE TABLE employees (\n"
+                        + "    employee_id NUMBER(10) PRIMARY KEY,\n"
+                        + "    first_name VARCHAR2(50),\n"
+                        + "    last_name VARCHAR2(50) NOT NULL,\n"
+                        + "    email VARCHAR2(100),\n"
+                        + "    phone_number VARCHAR2(20),\n"
+                        + "    hire_date DATE DEFAULT SYSDATE NOT NULL,\n"
+                        + "    job_id VARCHAR2(10) NOT NULL,\n"
+                        + "    salary NUMBER(8, 2),\n"
+                        + "    commission_pct NUMBER(2, 2),\n"
+                        + "    manager_id NUMBER(10),\n"
+                        + "    department_id NUMBER(10)\n"
+                        + ");";
+        TableSchema tableSchema =
+                schemaManager.parseCreateTableStatement(
+                        SourceConnector.ORACLE, ddl, dorisTable, new 
HashMap<>());
+
+        String expected =
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={employee_id=FieldSchema{name='employee_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', 
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', 
com [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
+
+    @Test
+    public void testParseOracleDuplicateTableStatement() {
+        String dorisTable = "doris.auto_tab";
+        String ddl =
+                "CREATE TABLE orders (\n"
+                        + "    order_id NUMBER(10) NOT NULL,\n"
+                        + "    customer_id NUMBER(10) NOT NULL,\n"
+                        + "    order_date DATE DEFAULT SYSDATE NOT NULL,\n"
+                        + "    status VARCHAR2(20) CHECK (status IN 
('PENDING', 'SHIPPED', 'DELIVERED', 'CANCELLED')),\n"
+                        + "    total_amount NUMBER(12, 2) NOT NULL,\n"
+                        + "    shipping_address VARCHAR2(255),\n"
+                        + "    delivery_date DATE,\n"
+                        + "    CONSTRAINT fk_customer FOREIGN KEY 
(customer_id)\n"
+                        + "        REFERENCES customers(customer_id),\n"
+                        + "    CONSTRAINT chk_total_amount CHECK (total_amount 
>= 0)\n"
+                        + ");";
+        TableSchema tableSchema =
+                schemaManager.parseCreateTableStatement(
+                        SourceConnector.ORACLE, ddl, dorisTable, new 
HashMap<>());
+
+        String expected =
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={order_id=FieldSchema{name='order_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
customer_id=FieldSchema{name='customer_id', typeString='BIGINT', 
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', 
typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, 
status=FieldSchema{name='status', typeString='VARCHAR(60)', 
defaultValue='null', comment=' [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java
new file mode 100644
index 00000000..c2a93893
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestJsonDebeziumChangeUtils {
+
+    @Test
+    public void testGetTableSchemaBuckets() {
+        Assert.assertNull(JsonDebeziumChangeUtils.getTableSchemaBuckets(null, 
null));
+        Map<String, Integer> map = new HashMap<>();
+        Assert.assertNull(JsonDebeziumChangeUtils.getTableSchemaBuckets(map, 
null));
+        map.put("tbl1", 1);
+        Assert.assertEquals(
+                JsonDebeziumChangeUtils.getTableSchemaBuckets(map, 
"tbl1").intValue(), 1);
+        map = new HashMap<>();
+        map.put("tbl2.*", 1);
+        Assert.assertEquals(
+                JsonDebeziumChangeUtils.getTableSchemaBuckets(map, 
"tbl2").intValue(), 1);
+        Assert.assertEquals(
+                JsonDebeziumChangeUtils.getTableSchemaBuckets(map, 
"tbl21").intValue(), 1);
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 041ebe51..a3c24d15 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -541,19 +541,6 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         return filedSchemaMap;
     }
 
-    @Test
-    public void testGetTableSchemaBuckets() {
-        Assert.assertNull(schemaChange.getTableSchemaBuckets(null, null));
-        Map<String, Integer> map = new HashMap<>();
-        Assert.assertNull(schemaChange.getTableSchemaBuckets(map, null));
-        map.put("tbl1", 1);
-        Assert.assertEquals(schemaChange.getTableSchemaBuckets(map, 
"tbl1").intValue(), 1);
-        map = new HashMap<>();
-        map.put("tbl2.*", 1);
-        Assert.assertEquals(schemaChange.getTableSchemaBuckets(map, 
"tbl2").intValue(), 1);
-        Assert.assertEquals(schemaChange.getTableSchemaBuckets(map, 
"tbl21").intValue(), 1);
-    }
-
     @After
     public void after() {
         mockRestService.close();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
index d31ab04a..7fdf97d2 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -18,12 +18,14 @@
 package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 
 public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase {
@@ -39,7 +41,7 @@ public class TestSQLParserSchemaChange extends 
TestJsonDebeziumChangeBase {
                         tableMapping,
                         null,
                         null,
-                        null,
+                        new HashMap<>(),
                         objectMapper,
                         null,
                         lineDelimiter,
@@ -138,4 +140,42 @@ public class TestSQLParserSchemaChange extends 
TestJsonDebeziumChangeBase {
                 "ALTER TABLE `test`.`t1` RENAME COLUMN `create_time2` 
`create_time`",
                 changeNameList.get(0));
     }
+
+    @Test
+    public void testAutoCreateTable() throws IOException {
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1721356080787,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sinka\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":16947,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":16947,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        JsonNode recordJsonNode = objectMapper.readTree(record);
+        TableSchema tableSchema =
+                schemaChange.tryParseCreateTableStatement(recordJsonNode, 
"doris.auto_tab");
+
+        String expected =
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', 
defaultValue='10000', comment='id_test'}, 
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', 
defaultValue='CURRENT_TIMESTAMP', comment='null'}, 
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', 
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', 
typeString='DECIMALV3(9,3)', defaultValue='1.0 [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
+
+    @Test
+    public void testAutoCreateUniqueTable() throws IOException {
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1721370593734,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink_unique\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":24279,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":24279,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":
 [...]
+        JsonNode recordJsonNode = objectMapper.readTree(record);
+        TableSchema tableSchema =
+                schemaChange.tryParseCreateTableStatement(recordJsonNode, 
"doris.auto_unique_tab");
+        String expected =
+                "TableSchema{database='doris', table='auto_unique_tab', 
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', 
defaultValue='null', comment='null'}, name=FieldSchema{name='name', 
typeString='VARCHAR(300)', defaultValue='null', comment='null'}, 
age=FieldSchema{name='age', typeString='INT', defaultValue='null', 
comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', 
defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distri [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
+
+    @Test
+    public void testAutoCreateDuplicateTable() throws IOException {
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1721370811092,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink_duplicate\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":24588,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":24588,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\
 [...]
+        JsonNode recordJsonNode = objectMapper.readTree(record);
+        TableSchema tableSchema =
+                schemaChange.tryParseCreateTableStatement(
+                        recordJsonNode, "doris.auto_duplicate_tab");
+        String expected =
+                "TableSchema{database='doris', table='auto_duplicate_tab', 
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', 
defaultValue='null', comment='null'}, name=FieldSchema{name='name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
age=FieldSchema{name='age', typeString='INT', defaultValue='null', 
comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', 
defaultValue='null', comment='null'}}, keys=, model=DUPLICATE, d [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index ff5ce03e..ab1dfe77 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import org.apache.doris.flink.DorisTestBase;
+import org.apache.doris.flink.sink.schema.SchemaChangeMode;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -61,6 +62,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
     private static final String TABLE_2 = "tbl2";
     private static final String TABLE_3 = "tbl3";
     private static final String TABLE_4 = "tbl4";
+    private static final String TABLE_SQL_PARSE = "tbl_sql_parse";
 
     private static final MySQLContainer MYSQL_CONTAINER =
             new MySQLContainer("mysql:8.0")
@@ -240,6 +242,158 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         jobClient.cancel().get();
     }
 
+    @Test
+    public void testMySQL2DorisSQLParse() throws Exception {
+        printClusterStatus();
+        initializeMySQLTable();
+        initializeDorisTable();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        Map<String, String> flinkMap = new HashMap<>();
+        flinkMap.put("execution.checkpointing.interval", "10s");
+        flinkMap.put("pipeline.operator-chaining", "false");
+        flinkMap.put("parallelism.default", "1");
+
+        Configuration configuration = Configuration.fromMap(flinkMap);
+        env.configure(configuration);
+
+        String database = DATABASE;
+        Map<String, String> mysqlConfig = new HashMap<>();
+        mysqlConfig.put("database-name", DATABASE);
+        mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
+        mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
+        mysqlConfig.put("username", MYSQL_USER);
+        mysqlConfig.put("password", MYSQL_PASSWD);
+        mysqlConfig.put("server-time-zone", "Asia/Shanghai");
+        Configuration config = Configuration.fromMap(mysqlConfig);
+
+        Map<String, String> sinkConfig = new HashMap<>();
+        sinkConfig.put("fenodes", getFenodes());
+        sinkConfig.put("username", USERNAME);
+        sinkConfig.put("password", PASSWORD);
+        sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, 
DORIS_CONTAINER.getHost()));
+        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        sinkConfig.put("sink.check-interval", "5000");
+        Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("replication_num", "1");
+
+        String includingTables = "tbl.*";
+        String excludingTables = "";
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        databaseSync
+                .setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setIgnoreDefaultValue(false)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                .setNewSchemaChange(true)
+                .setSchemaChangeMode(SchemaChangeMode.SQL_PARSER.getName())
+                // no single sink
+                .setSingleSink(true)
+                .create();
+        databaseSync.build();
+        JobClient jobClient = env.executeAsync();
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(10)));
+
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3");
+        String sql =
+                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s ) res order by 1";
+        String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, 
TABLE_2, DATABASE, TABLE_3);
+        checkResult(expected, query1, 2);
+
+        // add incremental data
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, 
MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format("insert into %s.%s  values 
('doris_1_1',10)", DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("insert into %s.%s  values 
('doris_2_1',11)", DATABASE, TABLE_2));
+            statement.execute(
+                    String.format("insert into %s.%s  values 
('doris_3_1',12)", DATABASE, TABLE_3));
+
+            statement.execute(
+                    String.format(
+                            "update %s.%s set age=18 where name='doris_1'", 
DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("delete from %s.%s where name='doris_2'", 
DATABASE, TABLE_2));
+        }
+
+        Thread.sleep(20000);
+        List<String> expected2 =
+                Arrays.asList(
+                        "doris_1,18", "doris_1_1,10", "doris_2_1,11", 
"doris_3,3", "doris_3_1,12");
+        sql =
+                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s ) res order by 1";
+        String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, 
TABLE_2, DATABASE, TABLE_3);
+        checkResult(expected2, query2, 2);
+
+        // mock schema change
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, 
MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "alter table %s.%s add column c1 varchar(128)", 
DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("alter table %s.%s drop column age", 
DATABASE, TABLE_1));
+            Thread.sleep(20000);
+            statement.execute(
+                    String.format(
+                            "insert into %s.%s  values 
('doris_1_1_1','c1_val')",
+                            DATABASE, TABLE_1));
+        }
+        Thread.sleep(20000);
+        List<String> expected3 =
+                Arrays.asList("doris_1,null", "doris_1_1,null", 
"doris_1_1_1,c1_val");
+        sql = "select * from %s.%s order by 1";
+        String query3 = String.format(sql, DATABASE, TABLE_1);
+        checkResult(expected3, query3, 2);
+
+        // mock create table
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, 
MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE %s.%s ( \n"
+                                    + "`name` varchar(256) primary key,\n"
+                                    + "`age` int\n"
+                                    + ")",
+                            DATABASE, TABLE_SQL_PARSE));
+            statement.execute(
+                    String.format(
+                            "insert into %s.%s  values ('doris_1',1)", 
DATABASE, TABLE_SQL_PARSE));
+            statement.execute(
+                    String.format(
+                            "insert into %s.%s  values ('doris_2',2)", 
DATABASE, TABLE_SQL_PARSE));
+            statement.execute(
+                    String.format(
+                            "insert into %s.%s  values ('doris_3',3)", 
DATABASE, TABLE_SQL_PARSE));
+        }
+        Thread.sleep(20000);
+        List<String> expected4 = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3");
+        sql = "select * from %s.%s order by 1";
+        String query4 = String.format(sql, DATABASE, TABLE_SQL_PARSE);
+        checkResult(expected4, query4, 2);
+
+        jobClient.cancel().get();
+    }
+
     @Test
     public void testMySQL2DorisByDefault() throws Exception {
         printClusterStatus();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to