This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab170aaa1 [INLONG-7660][Sort] Support DDL model for MySQL connector 
when running in all migrate mode (#7846)
ab170aaa1 is described below

commit ab170aaa18eff53cc8b9dabbaad647e4fa6b2cd2
Author: Schnapps <[email protected]>
AuthorDate: Wed Apr 19 19:08:26 2023 +0800

    [INLONG-7660][Sort] Support DDL model for MySQL connector when running in 
all migrate mode (#7846)
---
 inlong-common/pom.xml                              |   4 +
 .../sort/protocol/ddl/Utils/ColumnUtils.java       | 168 +++++++++++++++
 .../inlong/sort/protocol/ddl/enums/AlterType.java} |  39 +---
 .../inlong/sort/protocol/ddl/enums/IndexType.java} |  37 +---
 .../sort/protocol/ddl/enums/OperationType.java}    |  43 ++--
 .../sort/protocol/ddl/enums/PositionType.java}     |  42 ++--
 .../protocol/ddl/expressions/AlterColumn.java}     |  63 +++---
 .../sort/protocol/ddl/expressions/Column.java      |  68 ++++++
 .../sort/protocol/ddl/expressions/Position.java}   |  53 ++---
 .../inlong/sort/protocol/ddl/indexes/Index.java}   |  32 +--
 .../protocol/ddl/operations/AlterOperation.java}   |  53 ++---
 .../ddl/operations/CreateTableOperation.java       |  70 +++++++
 .../ddl/operations/DropTableOperation.java}        |  50 ++---
 .../sort/protocol/ddl/operations/Operation.java    |  49 +++++
 .../ddl/operations/RenameTableOperation.java}      |  50 ++---
 .../ddl/operations/TruncateTableOperation.java}    |  50 ++---
 .../table/RowDataDebeziumDeserializeSchema.java    |   1 +
 .../mysql/source/reader/MySqlRecordEmitter.java    |  44 +++-
 .../cdc/mysql/table/MySqlReadableMetadata.java     | 177 +---------------
 .../inlong/sort/cdc/mysql/utils/MetaDataUtils.java | 233 +++++++++++++++++++++
 .../sort/cdc/mysql/utils/OperationUtils.java       | 227 ++++++++++++++++++++
 .../org/apache/inlong/sort/cdc/TestOperation.java  |  85 ++++++++
 inlong-sort/sort-formats/format-json/pom.xml       |   6 +-
 .../inlong/sort/formats/json/canal/CanalJson.java  |  44 ++++
 .../sort/formats/json/debezium/DebeziumJson.java   |  46 +++-
 .../json/canal/CanalJsonSerializationTest.java     |  85 ++++++++
 .../json/canal/DebeziumJsonSerializationTest.java  |  78 +++++++
 licenses/inlong-sort-connectors/LICENSE            |   2 +-
 pom.xml                                            |   6 +
 29 files changed, 1382 insertions(+), 523 deletions(-)

diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index c2ed3ba40..882f33f02 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -84,6 +84,10 @@
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.jsqlparser</groupId>
+            <artifactId>jsqlparser</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java
new file mode 100644
index 000000000..84f74d567
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column.ColumnBuilder;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+
+/**
+ * Utils for parse from statement in sqlParser to a column object.
+ */
+public class ColumnUtils {
+
+    public static final String DEFAULT = "default";
+    public static final String NULL = "null";
+    public static final String NOT = "not";
+    public static final String COMMENT = "comment";
+    public static final String AFTER = "after";
+
+    /**
+     * parse column definition to a Column object
+     * this method is used for alter operation where a first flag is passed
+     * to determine whether the column is in the first position of one table.
+     */
+    public static Column parseColumnWithPosition(boolean isFirst,
+            Map<String, Integer> sqlType,
+            ColumnDefinition columnDefinition) {
+
+        ColDataType colDataType = columnDefinition.getColDataType();
+
+        List<String> definitions = new ArrayList<>();
+        if (colDataType.getArgumentsStringList() != null) {
+            definitions.addAll(colDataType.getArgumentsStringList());
+        }
+
+        List<String> columnSpecs = columnDefinition.getColumnSpecs();
+
+        ColumnBuilder columnBuilder = Column.builder();
+        String columnName = reformatName(columnDefinition.getColumnName());
+        columnBuilder.name(columnName)
+                .definition(definitions).isNullable(parseNullable(columnSpecs))
+                .defaultValue(parseDefaultValue(columnSpecs))
+                .jdbcType(sqlType.get(columnName))
+                .comment(parseComment(columnSpecs));
+
+        if (isFirst) {
+            // the column is in the first position of one table
+            columnBuilder.position(new Position(PositionType.FIRST, null));
+        } else {
+            columnBuilder.position(parsePosition(columnSpecs));
+        }
+
+        return columnBuilder.build();
+    }
+
+    /**
+     * parse column definitions to Column list.
+     * this method is used for createTable operation.
+     * @param sqlType the sql type map
+     * @param columnDefinitions the column definition list
+     * @return the column list
+     */
+    public static List<Column> parseColumns(Map<String, Integer> sqlType,
+            List<ColumnDefinition> columnDefinitions) {
+        List<Column> columns = new ArrayList<>();
+        columnDefinitions.forEach(columnDefinition -> {
+            columns.add(parseColumnWithPosition(false, sqlType, 
columnDefinition));
+        });
+        return columns;
+    }
+
+    public static String parseDefaultValue(List<String> specs) {
+        return removeContinuousQuotes(parseAdjacentString(specs, DEFAULT, 
false));
+    }
+
+    public static boolean parseNullable(List<String> specs) {
+        return !parseAdjacentString(specs, NULL, true).equalsIgnoreCase(NOT);
+    }
+
+    public static String parseComment(List<String> specs) {
+        return removeContinuousQuotes(parseAdjacentString(specs, COMMENT, 
false));
+    }
+
+    public static Position parsePosition(List<String> specs) {
+        String afterColumn = reformatName(parseAdjacentString(specs, AFTER, 
false));
+        if (!afterColumn.isEmpty()) {
+            return new Position(PositionType.AFTER, afterColumn);
+        }
+        return null;
+    }
+
+    /**
+     * get the string before or after the specific string in a list
+     * @param stringList the string list
+     * @param specificString the specific string
+     * @param front is front of the specific string
+     * @return the string before or after the specific string
+     */
+    public static String parseAdjacentString(List<String> stringList,
+            String specificString, boolean front) {
+
+        if (stringList == null || stringList.isEmpty()) {
+            return "";
+        }
+
+        for (int i = 0; i < stringList.size(); i++) {
+            if (stringList.get(i).equalsIgnoreCase(specificString)) {
+                if (front && i > 0) {
+                    return stringList.get(i - 1);
+                } else if (i < stringList.size() - 1) {
+                    return stringList.get(i + 1);
+                }
+            }
+        }
+        return "";
+
+    }
+
+    /**
+     * remove the continuous char in the string from both sides.
+     * @param str the input string, target the char to be removed
+     * @return the string without continuous chars from both sides
+     */
+    public static String removeContinuousChar(String str, char target) {
+        if (str == null || str.length() < 2) {
+            return str;
+        }
+        int start = 0;
+        int end = str.length() - 1;
+        while (start <= end && str.charAt(start) == target) {
+            start++;
+        }
+        while (end >= start && str.charAt(end) == target) {
+            end--;
+        }
+        return str.substring(start, end + 1);
+    }
+
+    public static String removeContinuousQuotes(String str) {
+        return removeContinuousChar(str, '\'');
+    }
+
+    public static String reformatName(String str) {
+        return removeContinuousChar(str, '`');
+    }
+
+}
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
similarity index 51%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
index feec23042..c6813b399 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
@@ -15,36 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.enums;
 
-import io.debezium.relational.history.TableChanges;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class DebeziumJson {
-
-    private Map<String, String> before;
-    private Map<String, Object> after;
-    private Source source;
-    private TableChanges.TableChange tableChange;
-    private long tsMs;
-    private String op;
-    private Boolean incremental;
-
-    @Builder
-    @Data
-    public static class Source {
+/**
+ * Alter type for alter column operation
+ */
+public enum AlterType {
 
-        private String name;
-        private String db;
-        private String table;
-        private List<String> pkNames;
-        private Map<String, Integer> sqlType;
-        private Map<String, String> mysqlType;
-    }
+    RENAME_COLUMN,
+    ADD_COLUMN,
+    DROP_COLUMN,
+    MODIFY_COLUMN,
+    CHANGE_COLUMN
 
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java
similarity index 51%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java
index feec23042..c140e5b8d 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java
@@ -15,36 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.enums;
 
-import io.debezium.relational.history.TableChanges;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class DebeziumJson {
-
-    private Map<String, String> before;
-    private Map<String, Object> after;
-    private Source source;
-    private TableChanges.TableChange tableChange;
-    private long tsMs;
-    private String op;
-    private Boolean incremental;
-
-    @Builder
-    @Data
-    public static class Source {
+/**
+ * Index type for create table operation
+ * only support normal index and primary key
+ */
+public enum IndexType {
 
-        private String name;
-        private String db;
-        private String table;
-        private List<String> pkNames;
-        private Map<String, Integer> sqlType;
-        private Map<String, String> mysqlType;
-    }
+    NORMAL_INDEX,
+    PRIMARY_KEY
 
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java
similarity index 51%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java
index feec23042..b5b6dee82 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java
@@ -15,36 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.enums;
 
-import io.debezium.relational.history.TableChanges;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class DebeziumJson {
-
-    private Map<String, String> before;
-    private Map<String, Object> after;
-    private Source source;
-    private TableChanges.TableChange tableChange;
-    private long tsMs;
-    private String op;
-    private Boolean incremental;
-
-    @Builder
-    @Data
-    public static class Source {
+/**
+ * Operation type for ddl operation
+ */
+public enum OperationType {
 
-        private String name;
-        private String db;
-        private String table;
-        private List<String> pkNames;
-        private Map<String, Integer> sqlType;
-        private Map<String, String> mysqlType;
-    }
+    CREATE,
+    ALTER,
+    DROP,
+    RENAME,
+    TRUNCATE,
+    INSERT,
+    UPDATE,
+    DELETE,
+    OTHER
 
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java
similarity index 51%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java
index feec23042..1eb15a59f 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java
@@ -15,36 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.enums;
 
-import io.debezium.relational.history.TableChanges;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class DebeziumJson {
-
-    private Map<String, String> before;
-    private Map<String, Object> after;
-    private Source source;
-    private TableChanges.TableChange tableChange;
-    private long tsMs;
-    private String op;
-    private Boolean incremental;
-
-    @Builder
-    @Data
-    public static class Source {
+/**
+ * Position type for add column operation
+ */
+public enum PositionType {
 
-        private String name;
-        private String db;
-        private String table;
-        private List<String> pkNames;
-        private Map<String, Integer> sqlType;
-        private Map<String, String> mysqlType;
-    }
+    /**
+     * add column to first position of a table
+     */
+    FIRST,
 
+    /**
+     * add column after a certain column
+     */
+    AFTER
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
similarity index 53%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
index c3cafc2b5..23b3e1b7e 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
@@ -15,48 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.expressions;
 
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
 import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
 
-@Builder
+/**
+ * Alter column expression.
+ */
 @JsonInclude(Include.NON_NULL)
 @Data
-public class CanalJson {
-
-    @JsonProperty("data")
-    private List<Map<String, Object>> data;
-    @JsonProperty("es")
-    private long es;
-    @JsonProperty("table")
-    private String table;
-    @JsonProperty("type")
-    private String type;
-    @JsonProperty("database")
-    private String database;
-    @JsonProperty("ts")
-    private long ts;
-    @JsonProperty("sql")
-    private String sql;
-    @JsonProperty("mysqlType")
-    private Map<String, String> mysqlType;
-    @JsonProperty("sqlType")
-    private Map<String, Integer> sqlType;
-    @JsonProperty("isDdl")
-    private boolean isDdl;
-    @JsonProperty("pkNames")
-    private List<String> pkNames;
-    @JsonProperty("schema")
-    private String schema;
-    @JsonProperty("oracleType")
-    private Map<String, String> oracleType;
-    @JsonProperty("incremental")
-    private Boolean incremental;
+public class AlterColumn {
+
+    @JsonProperty("alterType")
+    private AlterType alterType;
+
+    @JsonProperty("newColumn")
+    private Column newColumn;
+
+    @JsonProperty("oldColumn")
+    private Column oldColumn;
+
+    @JsonCreator
+    public AlterColumn(@JsonProperty("alterType") AlterType alterType,
+            @JsonProperty("newColumn") Column newColumn,
+            @JsonProperty("oldColumn") Column oldColumn) {
+        this.alterType = alterType;
+        this.newColumn = newColumn;
+        this.oldColumn = oldColumn;
+    }
 
+    public AlterColumn(@JsonProperty("alterType") AlterType alterType) {
+        this.alterType = alterType;
+    }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java
new file mode 100644
index 000000000..612e0f7e9
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.expressions;
+
+import java.util.List;
+import lombok.Builder;
+import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Column represents a column in a table.
+ */
+@Data
+@Builder
+@JsonInclude(Include.NON_NULL)
+public class Column {
+
+    @JsonProperty("name")
+    private String name;
+    @JsonProperty("definition")
+    private List<String> definition;
+    @JsonProperty("jdbcType")
+    private int jdbcType;
+    @JsonProperty("position")
+    private Position position;
+    @JsonProperty("isNullable")
+    private boolean isNullable;
+    @JsonProperty("defaultValue")
+    private String defaultValue;
+    @JsonProperty("comment")
+    private String comment;
+
+    @JsonCreator
+    public Column(@JsonProperty("name") String name, 
@JsonProperty("definition") List<String> definition,
+            @JsonProperty("jdbcType") int jdbcType, @JsonProperty("position") 
Position position,
+            @JsonProperty("isNullable") boolean isNullable, 
@JsonProperty("defaultValue") String defaultValue,
+            @JsonProperty("comment") String comment) {
+        this.name = name;
+        this.definition = definition;
+        this.jdbcType = jdbcType;
+        this.position = position;
+        this.defaultValue = defaultValue;
+        this.comment = comment;
+        this.isNullable = isNullable;
+    }
+
+    public Column(@JsonProperty("name") String name) {
+        this.name = name;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java
similarity index 53%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java
index c3cafc2b5..d07cdff85 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java
@@ -15,48 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.expressions;
 
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
 import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
 
-@Builder
+/**
+ * Position represents the position of a column in a table.
+ * It can be either before or after a specific column.
+ */
 @JsonInclude(Include.NON_NULL)
 @Data
-public class CanalJson {
+public class Position {
+
+    @JsonProperty("positionType")
+    private PositionType positionType;
 
-    @JsonProperty("data")
-    private List<Map<String, Object>> data;
-    @JsonProperty("es")
-    private long es;
-    @JsonProperty("table")
-    private String table;
-    @JsonProperty("type")
-    private String type;
-    @JsonProperty("database")
-    private String database;
-    @JsonProperty("ts")
-    private long ts;
-    @JsonProperty("sql")
-    private String sql;
-    @JsonProperty("mysqlType")
-    private Map<String, String> mysqlType;
-    @JsonProperty("sqlType")
-    private Map<String, Integer> sqlType;
-    @JsonProperty("isDdl")
-    private boolean isDdl;
-    @JsonProperty("pkNames")
-    private List<String> pkNames;
-    @JsonProperty("schema")
-    private String schema;
-    @JsonProperty("oracleType")
-    private Map<String, String> oracleType;
-    @JsonProperty("incremental")
-    private Boolean incremental;
+    @JsonProperty("columnName")
+    private String columnName;
 
+    @JsonCreator
+    public Position(@JsonProperty("positionType") PositionType positionType,
+            @JsonProperty("columnName") String columnName) {
+        this.positionType = positionType;
+        this.columnName = columnName;
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java
similarity index 54%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java
index feec23042..e518865b1 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java
@@ -15,36 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.indexes;
 
-import io.debezium.relational.history.TableChanges;
 import java.util.List;
-import java.util.Map;
-import lombok.Builder;
 import lombok.Data;
+import org.apache.inlong.sort.protocol.ddl.enums.IndexType;
 
+/**
+ * Index for create table operation
+ */
 @Data
-@Builder
-public class DebeziumJson {
+public class Index {
 
-    private Map<String, String> before;
-    private Map<String, Object> after;
-    private Source source;
-    private TableChanges.TableChange tableChange;
-    private long tsMs;
-    private String op;
-    private Boolean incremental;
+    private IndexType indexType;
 
-    @Builder
-    @Data
-    public static class Source {
+    private String indexName;
 
-        private String name;
-        private String db;
-        private String table;
-        private List<String> pkNames;
-        private Map<String, Integer> sqlType;
-        private Map<String, String> mysqlType;
-    }
+    private List<String> indexColumns;
 
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java
similarity index 54%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java
index c3cafc2b5..a04bb4c4a 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java
@@ -15,48 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.operations;
 
 import java.util.List;
-import java.util.Map;
-import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
 
-@Builder
+/**
+ * Alter operation which contains alter columns.
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("alterOperation")
 @JsonInclude(Include.NON_NULL)
 @Data
-public class CanalJson {
+public class AlterOperation extends Operation {
+
+    @JsonProperty("alterColumns")
+    private List<AlterColumn> alterColumns;
 
-    @JsonProperty("data")
-    private List<Map<String, Object>> data;
-    @JsonProperty("es")
-    private long es;
-    @JsonProperty("table")
-    private String table;
-    @JsonProperty("type")
-    private String type;
-    @JsonProperty("database")
-    private String database;
-    @JsonProperty("ts")
-    private long ts;
-    @JsonProperty("sql")
-    private String sql;
-    @JsonProperty("mysqlType")
-    private Map<String, String> mysqlType;
-    @JsonProperty("sqlType")
-    private Map<String, Integer> sqlType;
-    @JsonProperty("isDdl")
-    private boolean isDdl;
-    @JsonProperty("pkNames")
-    private List<String> pkNames;
-    @JsonProperty("schema")
-    private String schema;
-    @JsonProperty("oracleType")
-    private Map<String, String> oracleType;
-    @JsonProperty("incremental")
-    private Boolean incremental;
+    @JsonCreator
+    public AlterOperation(@JsonProperty("alterColumns") List<AlterColumn> 
alterColumns) {
+        super(OperationType.ALTER);
+        this.alterColumns = alterColumns;
+    }
 
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java
new file mode 100644
index 000000000..63db1cd8c
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.indexes.Index;
+
+/**
+ * CreateTableOperation represents a create table operation
+ * it can be "create table like" or "create table with columns and indexes"
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("createTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class CreateTableOperation extends Operation {
+
+    @JsonProperty("columns")
+    private List<Column> columns;
+
+    @JsonProperty("indexes")
+    private List<Index> indexes;
+
+    @JsonProperty("likeTable")
+    private String likeTable;
+
+    @JsonProperty("comment")
+    private String comment;
+
+    @JsonCreator
+    public CreateTableOperation(@JsonProperty("columns") List<Column> columns,
+            @JsonProperty("indexes") List<Index> indexes,
+            @JsonProperty("likeTable") String likeTable,
+            @JsonProperty("comment") String comment) {
+        super(OperationType.CREATE);
+        this.columns = columns;
+        this.indexes = indexes;
+        this.likeTable = likeTable;
+        this.comment = comment;
+    }
+
+    public CreateTableOperation() {
+        super(OperationType.CREATE);
+    }
+
+}
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java
similarity index 52%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java
index c3cafc2b5..34921a188 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java
@@ -15,48 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.operations;
 
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
 
-@Builder
+/**
+ * DropTableOperation represents a drop operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("dropTableOperation")
 @JsonInclude(Include.NON_NULL)
 @Data
-public class CanalJson {
+public class DropTableOperation extends Operation {
 
-    @JsonProperty("data")
-    private List<Map<String, Object>> data;
-    @JsonProperty("es")
-    private long es;
-    @JsonProperty("table")
-    private String table;
-    @JsonProperty("type")
-    private String type;
-    @JsonProperty("database")
-    private String database;
-    @JsonProperty("ts")
-    private long ts;
-    @JsonProperty("sql")
-    private String sql;
-    @JsonProperty("mysqlType")
-    private Map<String, String> mysqlType;
-    @JsonProperty("sqlType")
-    private Map<String, Integer> sqlType;
-    @JsonProperty("isDdl")
-    private boolean isDdl;
-    @JsonProperty("pkNames")
-    private List<String> pkNames;
-    @JsonProperty("schema")
-    private String schema;
-    @JsonProperty("oracleType")
-    private Map<String, String> oracleType;
-    @JsonProperty("incremental")
-    private Boolean incremental;
+    @JsonCreator
+    public DropTableOperation() {
+        super(OperationType.DROP);
+    }
 
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
new file mode 100644
index 000000000..3d44a242c
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+/**
+ * Operation represents a ddl operation.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = AlterOperation.class, name = 
"alterOperation"),
+        @JsonSubTypes.Type(value = CreateTableOperation.class, name = 
"createTableOperation"),
+        @JsonSubTypes.Type(value = DropTableOperation.class, name = 
"dropTableOperation"),
+        @JsonSubTypes.Type(value = TruncateTableOperation.class, name = 
"truncateTableOperation"),
+        @JsonSubTypes.Type(value = RenameTableOperation.class, name = 
"renameTableOperation")
+})
+@Data
+@NoArgsConstructor
+public abstract class Operation {
+
+    @JsonProperty("operationType")
+    private OperationType operationType;
+
+    public Operation(@JsonProperty("operationType") OperationType type) {
+        this.operationType = type;
+    }
+
+}
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java
similarity index 52%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java
index c3cafc2b5..7dc7205bc 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java
@@ -15,48 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.operations;
 
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
 
-@Builder
+/**
+ * RenameTableOperation represents a rename operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("renameTableOperation")
 @JsonInclude(Include.NON_NULL)
 @Data
-public class CanalJson {
+public class RenameTableOperation extends Operation {
 
-    @JsonProperty("data")
-    private List<Map<String, Object>> data;
-    @JsonProperty("es")
-    private long es;
-    @JsonProperty("table")
-    private String table;
-    @JsonProperty("type")
-    private String type;
-    @JsonProperty("database")
-    private String database;
-    @JsonProperty("ts")
-    private long ts;
-    @JsonProperty("sql")
-    private String sql;
-    @JsonProperty("mysqlType")
-    private Map<String, String> mysqlType;
-    @JsonProperty("sqlType")
-    private Map<String, Integer> sqlType;
-    @JsonProperty("isDdl")
-    private boolean isDdl;
-    @JsonProperty("pkNames")
-    private List<String> pkNames;
-    @JsonProperty("schema")
-    private String schema;
-    @JsonProperty("oracleType")
-    private Map<String, String> oracleType;
-    @JsonProperty("incremental")
-    private Boolean incremental;
+    @JsonCreator
+    public RenameTableOperation() {
+        super(OperationType.RENAME);
+    }
 
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java
similarity index 52%
copy from 
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java
index c3cafc2b5..b30af9b07 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java
@@ -15,48 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.operations;
 
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
 
-@Builder
+/**
+ * TruncateTableOperation represents a truncate operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("truncateTableOperation")
 @JsonInclude(Include.NON_NULL)
 @Data
-public class CanalJson {
+public class TruncateTableOperation extends Operation {
 
-    @JsonProperty("data")
-    private List<Map<String, Object>> data;
-    @JsonProperty("es")
-    private long es;
-    @JsonProperty("table")
-    private String table;
-    @JsonProperty("type")
-    private String type;
-    @JsonProperty("database")
-    private String database;
-    @JsonProperty("ts")
-    private long ts;
-    @JsonProperty("sql")
-    private String sql;
-    @JsonProperty("mysqlType")
-    private Map<String, String> mysqlType;
-    @JsonProperty("sqlType")
-    private Map<String, Integer> sqlType;
-    @JsonProperty("isDdl")
-    private boolean isDdl;
-    @JsonProperty("pkNames")
-    private List<String> pkNames;
-    @JsonProperty("schema")
-    private String schema;
-    @JsonProperty("oracleType")
-    private Map<String, String> oracleType;
-    @JsonProperty("incremental")
-    private Boolean incremental;
+    @JsonCreator
+    public TruncateTableOperation() {
+        super(OperationType.TRUNCATE);
+    }
 
 }
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
index 536ffb57e..9e26d58c4 100644
--- 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -746,6 +746,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             emit(record, insert, tableSchema, out);
         } catch (Exception e) {
             LOG.error("Failed to extract DDL record {}", record, e);
+            throw new RuntimeException(e);
         }
 
     }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 94f7b2c60..695c4363d 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -27,8 +27,15 @@ import io.debezium.relational.Tables;
 import io.debezium.relational.history.HistoryRecord;
 import io.debezium.relational.history.TableChanges;
 import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.Map.Entry;
+import java.util.Set;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Table;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.RenameTableStatement;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.base.enums.ReadPhase;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
@@ -46,6 +53,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static 
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -57,6 +66,7 @@ import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isHighWa
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isWatermarkEvent;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.toSnapshotRecord;
+import static 
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.reformatName;
 
 /**
  * The {@link RecordEmitter} implementation for {@link MySqlSourceReader}.
@@ -87,6 +97,7 @@ public final class MySqlRecordEmitter<T>
     private volatile long snapProcessTime = 0L;
 
     private boolean includeIncremental;
+    public final ObjectMapper objectMapper = new ObjectMapper();
 
     public MySqlRecordEmitter(
             DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
@@ -126,8 +137,9 @@ public final class MySqlRecordEmitter<T>
 
             if (tableChanges.isEmpty()) {
                 TableId tableId = RecordUtils.getTableId(element);
-                // if this table is one of the captured tables, output the ddl 
element
-                if 
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
+                // if this table is one of the captured tables, output the ddl 
element.
+                if 
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)
+                        || shouldOutputRenameDdl(element, tableId)) {
                     outputDdlElement(element, output, splitState, null);
                 }
             }
@@ -185,6 +197,34 @@ public final class MySqlRecordEmitter<T>
         }
     }
 
+    /**
+     * if rename operation is "rename a to b" where a is the captured table
+     * this method extract table names a and b, if any of table name is the 
captured table
+     * we should output ddl element
+     */
+    private boolean shouldOutputRenameDdl(SourceRecord element, TableId 
tableId) {
+        try {
+            String ddl = objectMapper.readTree(((Struct) 
element.value()).get(HISTORY_RECORD_FIELD).toString())
+                    .get(DDL_FIELD_NAME).asText();
+            Statement statement = CCJSqlParserUtil.parse(ddl);
+            if (statement instanceof RenameTableStatement) {
+                RenameTableStatement renameTableStatement = 
(RenameTableStatement) statement;
+                Set<Entry<Table, Table>> tableNames = 
renameTableStatement.getTableNames();
+                for (Entry<Table, Table> entry : tableNames) {
+                    Table oldTable = entry.getKey();
+                    Table newTable = entry.getValue();
+                    if 
(reformatName(oldTable.getName()).equals(tableId.table()) ||
+                            
reformatName(newTable.getName()).equals(tableId.table())) {
+                        return true;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("parse ddl error {}", element, e);
+        }
+        return false;
+    }
+
     private void updateSnapshotRecord(SourceRecord element, MySqlSplitState 
splitState) {
         if (splitState.isSnapshotSplitState() && includeIncremental) {
             toSnapshotRecord(element);
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index a44993891..498244e45 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -17,17 +17,16 @@
 
 package org.apache.inlong.sort.cdc.mysql.table;
 
-import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
-
-import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static 
org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getCanalData;
+import static 
org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getDebeziumData;
+import static org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getMetaData;
+import static org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getOpType;
 
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
 import io.debezium.data.Envelope.FieldName;
 import io.debezium.relational.Table;
 import io.debezium.relational.history.TableChanges;
-import io.debezium.relational.history.TableChanges.TableChange;
-import java.util.LinkedHashMap;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericArrayData;
@@ -38,17 +37,11 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.inlong.sort.cdc.base.debezium.table.MetadataConverter;
-import org.apache.inlong.sort.cdc.base.util.RecordUtils;
-import org.apache.inlong.sort.formats.json.canal.CanalJson;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import javax.annotation.Nullable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -163,28 +156,7 @@ public enum MySqlReadableMetadata {
                 @Override
                 public Object read(SourceRecord record,
                         @Nullable TableChanges.TableChange tableSchema, 
RowData rowData) {
-                    // construct debezium json
-                    Struct messageStruct = (Struct) record.value();
-                    Struct sourceStruct = 
messageStruct.getStruct(FieldName.SOURCE);
-                    GenericRowData data = (GenericRowData) rowData;
-                    Map<String, Object> field = (Map<String, Object>) 
data.getField(0);
-
-                    Source source = Source.builder().db(getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY))
-                            .table(getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY))
-                            
.name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
-                            .sqlType(getSqlType(tableSchema))
-                            .pkNames(getPkNames(tableSchema))
-                            .mysqlType(getMysqlType(tableSchema))
-                            .build();
-                    DebeziumJson debeziumJson = 
DebeziumJson.builder().after(field).source(source)
-                            
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
-                            
.tableChange(tableSchema).incremental(isSnapshotRecord(sourceStruct)).build();
-
-                    try {
-                        return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
-                    } catch (Exception e) {
-                        throw new IllegalStateException("exception occurs when 
get meta data", e);
-                    }
+                    return getDebeziumData(record, tableSchema, 
(GenericRowData) rowData);
                 }
             }),
 
@@ -436,46 +408,6 @@ public enum MySqlReadableMetadata {
                 }
             });
 
-    private static StringData getCanalData(SourceRecord record, GenericRowData 
rowData,
-            TableChange tableSchema) {
-        Struct messageStruct = (Struct) record.value();
-        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
-        // tableName
-        String tableName = getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY);
-        // databaseName
-        String databaseName = getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY);
-        // opTs
-        long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
-        // actual data
-        GenericRowData data = rowData;
-        Map<String, Object> field = (Map<String, Object>) data.getField(0);
-        List<Map<String, Object>> dataList = new ArrayList<>();
-
-        CanalJson canalJson = CanalJson.builder()
-                .database(databaseName)
-                .es(opTs).pkNames(getPkNames(tableSchema))
-                .mysqlType(getMysqlType(tableSchema)).table(tableName)
-                .type(getCanalOpType(rowData)).sqlType(getSqlType(tableSchema))
-                .incremental(isSnapshotRecord(sourceStruct)).build();
-
-        try {
-            if (RecordUtils.isDdlRecord(messageStruct)) {
-                canalJson.setSql((String) field.get(DDL_FIELD_NAME));
-                canalJson.setDdl(true);
-                canalJson.setData(dataList);
-            } else {
-                canalJson.setDdl(false);
-                canalJson.setTs((Long) messageStruct.get(FieldName.TIMESTAMP));
-                dataList.add(field);
-                canalJson.setData(dataList);
-            }
-            return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
-        } catch (Exception e) {
-            throw new IllegalStateException("exception occurs when get meta 
data", e);
-        }
-
-    }
-
     private final String key;
     private final DataType dataType;
     private final MetadataConverter converter;
@@ -487,103 +419,6 @@ public enum MySqlReadableMetadata {
         this.converter = converter;
     }
 
-    private static String getOpType(SourceRecord record) {
-        String opType;
-        final Envelope.Operation op = Envelope.operationFor(record);
-        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
-            opType = "INSERT";
-        } else if (op == Envelope.Operation.DELETE) {
-            opType = "DELETE";
-        } else {
-            opType = "UPDATE";
-        }
-        return opType;
-    }
-
-    private static String getCanalOpType(GenericRowData record) {
-        String opType;
-        switch (record.getRowKind()) {
-            case DELETE:
-            case UPDATE_BEFORE:
-                opType = "DELETE";
-                break;
-            case INSERT:
-            case UPDATE_AFTER:
-                opType = "INSERT";
-                break;
-            default:
-                throw new IllegalStateException("the record only have states 
in DELETE, "
-                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
-        }
-        return opType;
-    }
-
-    private static String getDebeziumOpType(GenericRowData record) {
-        String opType;
-        switch (record.getRowKind()) {
-            case DELETE:
-            case UPDATE_BEFORE:
-                opType = "d";
-                break;
-            case INSERT:
-            case UPDATE_AFTER:
-                opType = "c";
-                break;
-            default:
-                throw new IllegalStateException("the record only have states 
in DELETE, "
-                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
-        }
-        return opType;
-    }
-
-    private static List<String> getPkNames(@Nullable TableChanges.TableChange 
tableSchema) {
-        if (tableSchema == null) {
-            return null;
-        }
-        return tableSchema.getTable().primaryKeyColumnNames();
-    }
-
-    public static Map<String, String> getMysqlType(@Nullable 
TableChanges.TableChange tableSchema) {
-        if (tableSchema == null) {
-            return null;
-        }
-        Map<String, String> mysqlType = new LinkedHashMap<>();
-        final Table table = tableSchema.getTable();
-        table.columns()
-                .forEach(
-                        column -> {
-                            mysqlType.put(
-                                    column.name(),
-                                    String.format(
-                                            "%s(%d)",
-                                            column.typeName(),
-                                            column.length()));
-                        });
-        return mysqlType;
-    }
-
-    /**
-     * get sql type from table schema, represents the jdbc data type
-     *
-     * @param tableSchema table schema
-     */
-    public static Map<String, Integer> getSqlType(@Nullable 
TableChanges.TableChange tableSchema) {
-        if (tableSchema == null) {
-            return null;
-        }
-        Map<String, Integer> sqlType = new LinkedHashMap<>();
-        final Table table = tableSchema.getTable();
-        table.columns().forEach(
-                column -> sqlType.put(column.name(), column.jdbcType()));
-        return sqlType;
-    }
-
-    public static String getMetaData(SourceRecord record, String tableNameKey) 
{
-        Struct messageStruct = (Struct) record.value();
-        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
-        return sourceStruct.getString(tableNameKey);
-    }
-
     public String getKey() {
         return key;
     }
@@ -595,4 +430,4 @@ public enum MySqlReadableMetadata {
     public MetadataConverter getConverter() {
         return converter;
     }
-}
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
new file mode 100644
index 000000000..50ad44d2b
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.utils;
+
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
+import static 
org.apache.inlong.sort.cdc.mysql.utils.OperationUtils.generateOperation;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for generating metadata in mysql cdc.
+ */
+public class MetaDataUtils {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetaDataUtils.class);
+
+    /**
+     * get sql type from table schema, represents the jdbc data type
+     *
+     * @param tableSchema table schema
+     */
+    public static Map<String, Integer> getSqlType(@Nullable 
TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, Integer> sqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns().forEach(
+                column -> sqlType.put(column.name(), column.jdbcType()));
+        return sqlType;
+    }
+
+    public static String getMetaData(SourceRecord record, String tableNameKey) 
{
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        return sourceStruct.getString(tableNameKey);
+    }
+
+    public static String getOpType(SourceRecord record) {
+        String opType;
+        final Envelope.Operation op = Envelope.operationFor(record);
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            opType = "INSERT";
+        } else if (op == Envelope.Operation.DELETE) {
+            opType = "DELETE";
+        } else {
+            opType = "UPDATE";
+        }
+        return opType;
+    }
+
+    public static String getCanalOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "DELETE";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "INSERT";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states 
in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static String getDebeziumOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "d";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "c";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states 
in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static List<String> getPkNames(@Nullable TableChanges.TableChange 
tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        return tableSchema.getTable().primaryKeyColumnNames();
+    }
+
+    public static Map<String, String> getMysqlType(@Nullable 
TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, String> mysqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns()
+                .forEach(
+                        column -> {
+                            mysqlType.put(
+                                    column.name(),
+                                    String.format(
+                                            "%s(%d)",
+                                            column.typeName(),
+                                            column.length()));
+                        });
+        return mysqlType;
+    }
+
+    public static StringData getCanalData(SourceRecord record, GenericRowData 
rowData,
+            TableChange tableSchema) {
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        // tableName
+        String tableName = getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY);
+        // databaseName
+        String databaseName = getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY);
+        // opTs
+        long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+        // actual data
+        Map<String, Object> field = (Map<String, Object>) rowData.getField(0);
+        List<Map<String, Object>> dataList = new ArrayList<>();
+
+        CanalJson canalJson = CanalJson.builder()
+                .database(databaseName)
+                .es(opTs).pkNames(getPkNames(tableSchema))
+                .mysqlType(getMysqlType(tableSchema)).table(tableName)
+                .incremental(!isSnapshotRecord(sourceStruct))
+                
.dataSourceName(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
+                .type(getCanalOpType(rowData))
+                .sqlType(getSqlType(tableSchema))
+                .build();
+
+        try {
+            if (RecordUtils.isDdlRecord(messageStruct)) {
+                String sql = (String) field.get(DDL_FIELD_NAME);
+                canalJson.setSql(sql);
+                canalJson.setOperation(generateOperation(sql, 
getSqlType(tableSchema)));
+                canalJson.setDdl(true);
+                canalJson.setData(dataList);
+            } else {
+                canalJson.setDdl(false);
+                canalJson.setTs((Long) messageStruct.get(FieldName.TIMESTAMP));
+                dataList.add(field);
+                canalJson.setData(dataList);
+            }
+            LOG.debug("canal json: {}", canalJson);
+            return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
+        } catch (Exception e) {
+            throw new IllegalStateException("exception occurs when get meta 
data", e);
+        }
+    }
+
+    public static StringData getDebeziumData(SourceRecord record, TableChange 
tableSchema,
+            GenericRowData rowData) {
+        // construct debezium json
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        GenericRowData data = rowData;
+        Map<String, Object> field = (Map<String, Object>) data.getField(0);
+
+        Source source = Source.builder().db(getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY))
+                .table(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY))
+                
.name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
+                .sqlType(getSqlType(tableSchema))
+                .pkNames(getPkNames(tableSchema))
+                .mysqlType(getMysqlType(tableSchema))
+                .build();
+        DebeziumJson debeziumJson = DebeziumJson.builder().source(source)
+                
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
+                
.dataSourceName(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
+                
.tableChange(tableSchema).incremental(!isSnapshotRecord(sourceStruct)).build();
+
+        try {
+            if (RecordUtils.isDdlRecord(messageStruct)) {
+                String sql = (String) field.get(DDL_FIELD_NAME);
+                debeziumJson.setDdl(sql);
+                debeziumJson.setOperation(generateOperation(sql, 
getSqlType(tableSchema)));
+                debeziumJson.setAfter(new HashMap<>());
+            } else {
+                debeziumJson.setAfter(field);
+            }
+            return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
+        } catch (Exception e) {
+            throw new IllegalStateException("exception occurs when get meta 
data {}", e);
+        }
+
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
new file mode 100644
index 000000000..51c2533a0
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.utils;
+
+import static 
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.parseColumnWithPosition;
+import static 
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.parseColumns;
+import static 
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.parseComment;
+import static 
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.reformatName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.alter.RenameTableStatement;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import net.sf.jsqlparser.statement.drop.Drop;
+import net.sf.jsqlparser.statement.truncate.Truncate;
+import org.apache.commons.lang.StringUtils;
+import 
org.apache.inlong.sort.cdc.base.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.IndexType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.indexes.Index;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.DropTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.ddl.operations.RenameTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.TruncateTableOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for generate operation from statement from sqlParser.
+ */
+public class OperationUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class);
+    public static final String PRIMARY_KEY = "PRIMARY KEY";
+    public static final String NORMAL_INDEX = "NORMAL_INDEX";
+    public static final String FIRST = "FIRST";
+
+    /**
+     * generate operation from sql and table schema.
+     * @param sql sql from binlog
+     * @param sqlType table sql types
+     * @return Operation
+     */
+    public static Operation generateOperation(String sql, Map<String, Integer> 
sqlType) {
+        try {
+            // now sqlParser don't support first position
+            // remove it first and add it later
+            boolean endsWithFirst = StringUtils.endsWithIgnoreCase(sql, FIRST);
+            if (endsWithFirst) {
+                sql = removeFirstFlag(sql);
+            }
+            Statement statement = CCJSqlParserUtil.parse(sql);
+            if (statement instanceof Alter) {
+                return parseAlterOperation(
+                        (Alter) statement, sqlType, endsWithFirst);
+            } else if (statement instanceof CreateTable) {
+                return parseCreateTableOperation(
+                        (CreateTable) statement, sqlType);
+            } else if (statement instanceof Drop) {
+                return new DropTableOperation();
+            } else if (statement instanceof Truncate) {
+                return new TruncateTableOperation();
+            } else if (statement instanceof RenameTableStatement) {
+                return new RenameTableOperation();
+            } else {
+                LOG.warn("doesn't support sql {}, statement {}", sql, 
statement);
+            }
+        } catch (Exception e) {
+            LOG.error("parse ddl error: {}, set ddl to null", sql, e);
+        }
+        return null;
+    }
+
+    /**
+     * parse alter operation from Alter from sqlParser.
+     * @param statement alter statement
+     * @param sqlType sql types
+     * @param isFirst whether the column is first
+     * @return AlterOperation
+     */
+    private static AlterOperation parseAlterOperation(Alter statement,
+            Map<String, Integer> sqlType, boolean isFirst) {
+
+        List<AlterColumn> alterColumns = new ArrayList<>();
+        statement.getAlterExpressions().forEach(alterExpression -> {
+            switch (alterExpression.getOperation()) {
+                case DROP:
+                    alterColumns.add(new AlterColumn(AlterType.DROP_COLUMN,
+                            null,
+                            
Column.builder().name(reformatName(alterExpression.getColumnName()))
+                                    .build()));
+                    break;
+                case ADD:
+                    alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN,
+                            parseColumnWithPosition(isFirst, sqlType,
+                                    
alterExpression.getColDataTypeList().get(0)),
+                            null));
+                    break;
+                case RENAME:
+                    alterColumns.add(new AlterColumn(AlterType.CHANGE_COLUMN,
+                            new 
Column(reformatName(alterExpression.getColumnName())),
+                            new 
Column(reformatName(alterExpression.getColumnOldName()))));
+                    break;
+                case MODIFY:
+                    // modify column use change column type
+                case CHANGE:
+                    alterColumns.add(new AlterColumn(AlterType.CHANGE_COLUMN,
+                            parseColumnWithPosition(isFirst, sqlType,
+                                    
alterExpression.getColDataTypeList().get(0)),
+                            new 
Column(reformatName(alterExpression.getColumnOldName()))));
+                    break;
+                default:
+                    LOG.warn("doesn't support alter operation {}, statement 
{}",
+                            alterExpression.getOperation(), statement);
+            }
+
+        });
+
+        return new AlterOperation(alterColumns);
+    }
+
+    /**
+     * parse create table operation from CreateTable from sqlParser.
+     * @param statement create table statement
+     * @return CreateTableOperation
+     */
+    private static CreateTableOperation parseCreateTableOperation(
+            CreateTable statement, Map<String, Integer> sqlType) {
+
+        CreateTableOperation createTableOperation = new CreateTableOperation();
+        List<ColumnDefinition> columnDefinitions = 
statement.getColumnDefinitions();
+
+        if (statement.getLikeTable() != null) {
+            createTableOperation.setLikeTable(parseLikeTable(statement));
+            return createTableOperation;
+        }
+
+        createTableOperation.setColumns(parseColumns(sqlType, 
columnDefinitions));
+        createTableOperation.setIndexes(parseIndexes(statement));
+        
createTableOperation.setComment(parseComment(statement.getTableOptionsStrings()));
+
+        return createTableOperation;
+    }
+
+    /**
+     * parse indexes from statement
+     * only support primary key and normal index.
+     * @param statement create table statement
+     * @return list of indexes
+     */
+    private static List<Index> parseIndexes(CreateTable statement) {
+
+        if (statement.getIndexes() == null) {
+            return new ArrayList<>();
+        }
+        List<Index> indexList = new ArrayList<>();
+
+        for (net.sf.jsqlparser.statement.create.table.Index perIndex : 
statement.getIndexes()) {
+            Index index = new Index();
+            switch (perIndex.getType()) {
+                case PRIMARY_KEY:
+                    index.setIndexType(IndexType.PRIMARY_KEY);
+                    break;
+                case NORMAL_INDEX:
+                    index.setIndexType(IndexType.NORMAL_INDEX);
+                    break;
+                default:
+                    LOG.error("unsupported index type {}", perIndex.getType());
+                    break;
+            }
+            List<String> columns = new ArrayList<>();
+            perIndex.getColumnsNames().forEach(columnName -> 
columns.add(reformatName(columnName)));
+            index.setIndexName(reformatName(perIndex.getName()));
+            index.setIndexColumns(columns);
+            indexList.add(index);
+        }
+
+        return indexList;
+
+    }
+
+    /**
+     * remove the first flag from sql.
+     * @param sql sql from binlog
+     * @return sql without first flag
+     */
+    private static String removeFirstFlag(String sql) {
+        return sql.substring(0, StringUtils.lastIndexOfIgnoreCase(sql, FIRST));
+    }
+
+    /**
+     * get like table from statement.
+     * @param statement create table statement
+     * @return like table name
+     */
+    private static String parseLikeTable(CreateTable statement) {
+        if (statement.getLikeTable() != null) {
+            return statement.getLikeTable().getName();
+        }
+        return "";
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
new file mode 100644
index 000000000..fdc990fa7
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc;
+
+import java.util.HashMap;
+import org.apache.inlong.sort.cdc.mysql.utils.OperationUtils;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for table operations
+ */
+public class TestOperation {
+
+    @Test
+    public void testRenameTableOperation() {
+        String sql = "rename table `tv3` to `tv4`";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        sqlType.put("tv3", 1);
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertEquals(operation.getOperationType(), 
OperationType.RENAME);
+    }
+
+    @Test
+    public void testDropTableOperation() {
+        String sql = "drop table `tv3`";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        sqlType.put("tv3", 1);
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertEquals(operation.getOperationType(), OperationType.DROP);
+    }
+
+    @Test
+    public void testAddColumnOperation() {
+        String sql = "alter table a add column b int comment \"test\" first";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        sqlType.put("b", 1);
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertEquals(operation.getOperationType(), OperationType.ALTER);
+        AlterColumn alterColumn = ((AlterOperation) 
operation).getAlterColumns().get(0);
+        Assert.assertEquals(alterColumn.getAlterType(), AlterType.ADD_COLUMN);
+        Assert.assertEquals(alterColumn.getNewColumn().getName(), "b");
+        
Assert.assertEquals(alterColumn.getNewColumn().getPosition().getPositionType(), 
PositionType.FIRST);
+        
Assert.assertNull(alterColumn.getNewColumn().getPosition().getColumnName());
+    }
+
+    @Test
+    public void testRenameColumnOperation() {
+        String sql = "alter table a CHANGE b c int";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        sqlType.put("c", 1);
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertEquals(operation.getOperationType(), OperationType.ALTER);
+        AlterColumn alterColumn = ((AlterOperation) 
operation).getAlterColumns().get(0);
+        Assert.assertEquals(alterColumn.getAlterType(), 
AlterType.CHANGE_COLUMN);
+        Assert.assertEquals(alterColumn.getNewColumn().getName(), "c");
+        Assert.assertEquals(alterColumn.getOldColumn().getName(), "b");
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-json/pom.xml 
b/inlong-sort/sort-formats/format-json/pom.xml
index a108c1eaf..93228f370 100644
--- a/inlong-sort/sort-formats/format-json/pom.xml
+++ b/inlong-sort/sort-formats/format-json/pom.xml
@@ -78,7 +78,11 @@
             <artifactId>debezium-core</artifactId>
             <scope>provided</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
index c3cafc2b5..896299f69 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
@@ -19,13 +19,18 @@ package org.apache.inlong.sort.formats.json.canal;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
 
 @Builder
+@JsonTypeName("canalJson")
 @JsonInclude(Include.NON_NULL)
 @Data
 public class CanalJson {
@@ -56,7 +61,46 @@ public class CanalJson {
     private String schema;
     @JsonProperty("oracleType")
     private Map<String, String> oracleType;
+    @JsonProperty("operation")
+    private Operation operation;
     @JsonProperty("incremental")
     private Boolean incremental;
+    @JsonProperty("dataSourceName")
+    private String dataSourceName;
+
+    @JsonCreator
+    public CanalJson(@Nullable @JsonProperty("data") List<Map<String, Object>> 
data,
+            @JsonProperty("es") long es,
+            @JsonProperty("table") String table,
+            @JsonProperty("type") String type,
+            @JsonProperty("database") String database,
+            @JsonProperty("ts") long ts,
+            @JsonProperty("sql") String sql,
+            @Nullable @JsonProperty("mysqlType") Map<String, String> mysqlType,
+            @Nullable @JsonProperty("sqlType") Map<String, Integer> sqlType,
+            @JsonProperty("isDdl") boolean isDdl,
+            @Nullable @JsonProperty("pkNames") List<String> pkNames,
+            @JsonProperty("schema") String schema,
+            @Nullable @JsonProperty("oracleType") Map<String, String> 
oracleType,
+            @JsonProperty("operation") Operation operation,
+            @JsonProperty("incremental") Boolean incremental,
+            @JsonProperty("dataSourceName") String dataSourceName) {
+        this.data = data;
+        this.es = es;
+        this.table = table;
+        this.type = type;
+        this.database = database;
+        this.ts = ts;
+        this.sql = sql;
+        this.mysqlType = mysqlType;
+        this.sqlType = sqlType;
+        this.isDdl = isDdl;
+        this.pkNames = pkNames;
+        this.schema = schema;
+        this.oracleType = oracleType;
+        this.operation = operation;
+        this.incremental = incremental;
+        this.dataSourceName = dataSourceName;
+    }
 
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
index feec23042..d736d2467 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++ 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
@@ -18,22 +18,64 @@
 package org.apache.inlong.sort.formats.json.debezium;
 
 import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
 import java.util.List;
 import java.util.Map;
 import lombok.Builder;
 import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
 
-@Data
 @Builder
+@JsonTypeName("canalJson")
+@JsonInclude(Include.NON_NULL)
+@Data
 public class DebeziumJson {
 
+    @JsonProperty("before")
     private Map<String, String> before;
+    @JsonProperty("after")
     private Map<String, Object> after;
+    @JsonProperty("source")
     private Source source;
+    @JsonProperty("tableChange")
     private TableChanges.TableChange tableChange;
+    @JsonProperty("tsMs")
     private long tsMs;
+    @JsonProperty("op")
     private String op;
-    private Boolean incremental;
+    @JsonProperty("incremental")
+    private boolean incremental;
+    @JsonProperty("ddl")
+    private String ddl;
+    @JsonProperty("operation")
+    private Operation operation;
+    @JsonProperty("dataSourceName")
+    private String dataSourceName;
+
+    public DebeziumJson(@JsonProperty("before") Map<String, String> before,
+            @JsonProperty("after") Map<String, Object> after,
+            @JsonProperty("source") Source source,
+            @JsonProperty("tableChange") TableChange tableChange,
+            @JsonProperty("tsMs") long tsMs, @JsonProperty("op") String op,
+            @JsonProperty("incremental") boolean incremental,
+            @JsonProperty("ddl") String ddl,
+            @JsonProperty("operation") Operation operation,
+            @JsonProperty("dataSourceName") String dataSourceName) {
+        this.before = before;
+        this.after = after;
+        this.source = source;
+        this.tableChange = tableChange;
+        this.tsMs = tsMs;
+        this.op = op;
+        this.incremental = incremental;
+        this.ddl = ddl;
+        this.operation = operation;
+        this.dataSourceName = dataSourceName;
+    }
 
     @Builder
     @Data
diff --git 
a/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationTest.java
 
b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationTest.java
new file mode 100644
index 000000000..2384e2162
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.ArrayList;
+import java.util.List;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link CanalJson}.
+ */
+public class CanalJsonSerializationTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CanalJsonSerializationTest.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Test
+    public void testCanalJsonSerialization() {
+
+        List<AlterColumn> alterColumns = new ArrayList<>();
+
+        Column column = new Column("columnDataType.getColumnName()", new 
ArrayList<>(),
+                1,
+                new Position(PositionType.FIRST, null), true, "23",
+                "23");
+
+        alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN, column, null));
+
+        AlterOperation alterOperation = new AlterOperation(alterColumns);
+
+        CanalJson canalJson = CanalJson.builder()
+                .data(null)
+                .es(0)
+                .table("table")
+                .type("type")
+                .database("database")
+                .ts(0)
+                .sql("sql")
+                .mysqlType(null)
+                .sqlType(null)
+                .pkNames(null)
+                .schema("schema")
+                .oracleType(null)
+                .operation(alterOperation)
+                .incremental(false)
+                .build();
+
+        ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+        try {
+            String writeValueAsString = 
OBJECT_MAPPER.writeValueAsString(canalJson);
+            LOG.info(writeValueAsString);
+            objectMapper.readValue(writeValueAsString, CanalJson.class);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git 
a/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/DebeziumJsonSerializationTest.java
 
b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/DebeziumJsonSerializationTest.java
new file mode 100644
index 000000000..c3f4bdd71
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/DebeziumJsonSerializationTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link DebeziumJson}.
+ */
+public class DebeziumJsonSerializationTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CanalJsonSerializationTest.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Test
+    public void testDebeziumJsonSerialization() {
+
+        List<AlterColumn> alterColumns = new ArrayList<>();
+
+        Column column = new Column("columnDataType.getColumnName()", new 
ArrayList<>(),
+                1,
+                new Position(PositionType.FIRST, null), true, "23",
+                "23");
+
+        alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN, column, null));
+
+        AlterOperation alterOperation = new AlterOperation(alterColumns);
+
+        DebeziumJson debeziumJson = DebeziumJson.builder().source(null)
+                .dataSourceName("dataSourceName")
+                .tableChange(null).incremental(false).build();
+
+        debeziumJson.setDdl("");
+        debeziumJson.setOperation(alterOperation);
+        debeziumJson.setAfter(new HashMap<>());
+
+        ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+        try {
+            String writeValueAsString = 
OBJECT_MAPPER.writeValueAsString(debeziumJson);
+            LOG.info(writeValueAsString);
+            objectMapper.readValue(writeValueAsString, DebeziumJson.class);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 057dc5d77..40ab5780a 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -873,7 +873,7 @@ The text of each license is the standard Apache 2.0 license.
   net.sf.jpam:jpam:1.1 - JPam (http://jpam.sourceforge.net/), (The Apache 
Software License, Version 2.0)
   com.cedarsoftware:json-io:2.5.1 - Java JSON serialization 
(https://github.com/jdereg/json-io), (Apache License, Version 2.0)
   net.minidev:json-smart:2.3 - JSON Small and Fast Parser 
(https://github.com/netplex/json-smart-v2/tree/v2.3), (The Apache Software 
License, Version 2.0)
-  com.github.jsqlparser:jsqlparser:2.1 - JSQLParser library 
(https://github.com/JSQLParser/JSqlParser), (The Apache Software License, 
Version 2.0;  GNU Library or Lesser General Public License (LGPL) V2.1)
+  com.github.jsqlparser:jsqlparser:4.2 - JSQLParser library 
(https://github.com/JSQLParser/JSqlParser), (The Apache Software License, 
Version 2.0;  GNU Library or Lesser General Public License (LGPL) V2.1)
   org.apache.kerby:kerb-admin:1.0.1 - Kerby-kerb Admin 
(http://directory.apache.org/kerby/kerby-kerb/kerb-admin), (Apache License, 
Version 2.0)
   org.apache.kerby:kerb-client:1.0.1 - Kerby-kerb Client 
(http://directory.apache.org/kerby/kerby-kerb/kerb-client), (Apache License, 
Version 2.0)
   org.apache.kerby:kerb-common:1.0.1 - Kerby-kerb Common 
(http://directory.apache.org/kerby/kerby-kerb/kerb-common), (Apache License, 
Version 2.0)
diff --git a/pom.xml b/pom.xml
index 62e5a5ad0..56dbf6dfb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -208,6 +208,7 @@
         <tomcat.version>8.5.53</tomcat.version>
         <jedis.version>2.9.0</jedis.version>
         <poi.version>5.2.3</poi.version>
+        <jsqlparser.version>4.2</jsqlparser.version>
     </properties>
 
     <dependencyManagement>
@@ -233,6 +234,11 @@
                 <artifactId>flume-ng-configuration</artifactId>
                 <version>${flume.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.jsqlparser</groupId>
+                <artifactId>jsqlparser</artifactId>
+                <version>${jsqlparser.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.zaxxer</groupId>
                 <artifactId>HikariCP</artifactId>

Reply via email to