This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab170aaa1 [INLONG-7660][Sort] Support DDL model for MySQL connector
when running in all migrate mode (#7846)
ab170aaa1 is described below
commit ab170aaa18eff53cc8b9dabbaad647e4fa6b2cd2
Author: Schnapps <[email protected]>
AuthorDate: Wed Apr 19 19:08:26 2023 +0800
[INLONG-7660][Sort] Support DDL model for MySQL connector when running in
all migrate mode (#7846)
---
inlong-common/pom.xml | 4 +
.../sort/protocol/ddl/Utils/ColumnUtils.java | 168 +++++++++++++++
.../inlong/sort/protocol/ddl/enums/AlterType.java} | 39 +---
.../inlong/sort/protocol/ddl/enums/IndexType.java} | 37 +---
.../sort/protocol/ddl/enums/OperationType.java} | 43 ++--
.../sort/protocol/ddl/enums/PositionType.java} | 42 ++--
.../protocol/ddl/expressions/AlterColumn.java} | 63 +++---
.../sort/protocol/ddl/expressions/Column.java | 68 ++++++
.../sort/protocol/ddl/expressions/Position.java} | 53 ++---
.../inlong/sort/protocol/ddl/indexes/Index.java} | 32 +--
.../protocol/ddl/operations/AlterOperation.java} | 53 ++---
.../ddl/operations/CreateTableOperation.java | 70 +++++++
.../ddl/operations/DropTableOperation.java} | 50 ++---
.../sort/protocol/ddl/operations/Operation.java | 49 +++++
.../ddl/operations/RenameTableOperation.java} | 50 ++---
.../ddl/operations/TruncateTableOperation.java} | 50 ++---
.../table/RowDataDebeziumDeserializeSchema.java | 1 +
.../mysql/source/reader/MySqlRecordEmitter.java | 44 +++-
.../cdc/mysql/table/MySqlReadableMetadata.java | 177 +---------------
.../inlong/sort/cdc/mysql/utils/MetaDataUtils.java | 233 +++++++++++++++++++++
.../sort/cdc/mysql/utils/OperationUtils.java | 227 ++++++++++++++++++++
.../org/apache/inlong/sort/cdc/TestOperation.java | 85 ++++++++
inlong-sort/sort-formats/format-json/pom.xml | 6 +-
.../inlong/sort/formats/json/canal/CanalJson.java | 44 ++++
.../sort/formats/json/debezium/DebeziumJson.java | 46 +++-
.../json/canal/CanalJsonSerializationTest.java | 85 ++++++++
.../json/canal/DebeziumJsonSerializationTest.java | 78 +++++++
licenses/inlong-sort-connectors/LICENSE | 2 +-
pom.xml | 6 +
29 files changed, 1382 insertions(+), 523 deletions(-)
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index c2ed3ba40..882f33f02 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -84,6 +84,10 @@
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.jsqlparser</groupId>
+ <artifactId>jsqlparser</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java
new file mode 100644
index 000000000..84f74d567
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/Utils/ColumnUtils.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column.ColumnBuilder;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+
+/**
+ * Utils for parse from statement in sqlParser to a column object.
+ */
+public class ColumnUtils {
+
+ public static final String DEFAULT = "default";
+ public static final String NULL = "null";
+ public static final String NOT = "not";
+ public static final String COMMENT = "comment";
+ public static final String AFTER = "after";
+
+ /**
+ * parse column definition to a Column object
+ * this method is used for alter operation where a first flag is passed
+ * to determine whether the column is in the first position of one table.
+ */
+ public static Column parseColumnWithPosition(boolean isFirst,
+ Map<String, Integer> sqlType,
+ ColumnDefinition columnDefinition) {
+
+ ColDataType colDataType = columnDefinition.getColDataType();
+
+ List<String> definitions = new ArrayList<>();
+ if (colDataType.getArgumentsStringList() != null) {
+ definitions.addAll(colDataType.getArgumentsStringList());
+ }
+
+ List<String> columnSpecs = columnDefinition.getColumnSpecs();
+
+ ColumnBuilder columnBuilder = Column.builder();
+ String columnName = reformatName(columnDefinition.getColumnName());
+ columnBuilder.name(columnName)
+ .definition(definitions).isNullable(parseNullable(columnSpecs))
+ .defaultValue(parseDefaultValue(columnSpecs))
+ .jdbcType(sqlType.get(columnName))
+ .comment(parseComment(columnSpecs));
+
+ if (isFirst) {
+ // the column is in the first position of one table
+ columnBuilder.position(new Position(PositionType.FIRST, null));
+ } else {
+ columnBuilder.position(parsePosition(columnSpecs));
+ }
+
+ return columnBuilder.build();
+ }
+
+ /**
+ * parse column definitions to Column list.
+ * this method is used for createTable operation.
+ * @param sqlType the sql type map
+ * @param columnDefinitions the column definition list
+ * @return the column list
+ */
+ public static List<Column> parseColumns(Map<String, Integer> sqlType,
+ List<ColumnDefinition> columnDefinitions) {
+ List<Column> columns = new ArrayList<>();
+ columnDefinitions.forEach(columnDefinition -> {
+ columns.add(parseColumnWithPosition(false, sqlType,
columnDefinition));
+ });
+ return columns;
+ }
+
+ public static String parseDefaultValue(List<String> specs) {
+ return removeContinuousQuotes(parseAdjacentString(specs, DEFAULT,
false));
+ }
+
+ public static boolean parseNullable(List<String> specs) {
+ return !parseAdjacentString(specs, NULL, true).equalsIgnoreCase(NOT);
+ }
+
+ public static String parseComment(List<String> specs) {
+ return removeContinuousQuotes(parseAdjacentString(specs, COMMENT,
false));
+ }
+
+ public static Position parsePosition(List<String> specs) {
+ String afterColumn = reformatName(parseAdjacentString(specs, AFTER,
false));
+ if (!afterColumn.isEmpty()) {
+ return new Position(PositionType.AFTER, afterColumn);
+ }
+ return null;
+ }
+
+ /**
+ * get the string before or after the specific string in a list
+ * @param stringList the string list
+ * @param specificString the specific string
+ * @param front is front of the specific string
+ * @return the string before or after the specific string
+ */
+ public static String parseAdjacentString(List<String> stringList,
+ String specificString, boolean front) {
+
+ if (stringList == null || stringList.isEmpty()) {
+ return "";
+ }
+
+ for (int i = 0; i < stringList.size(); i++) {
+ if (stringList.get(i).equalsIgnoreCase(specificString)) {
+ if (front && i > 0) {
+ return stringList.get(i - 1);
+ } else if (i < stringList.size() - 1) {
+ return stringList.get(i + 1);
+ }
+ }
+ }
+ return "";
+
+ }
+
+ /**
+ * remove the continuous char in the string from both sides.
+ * @param str the input string, target the char to be removed
+ * @return the string without continuous chars from both sides
+ */
+ public static String removeContinuousChar(String str, char target) {
+ if (str == null || str.length() < 2) {
+ return str;
+ }
+ int start = 0;
+ int end = str.length() - 1;
+ while (start <= end && str.charAt(start) == target) {
+ start++;
+ }
+ while (end >= start && str.charAt(end) == target) {
+ end--;
+ }
+ return str.substring(start, end + 1);
+ }
+
+ public static String removeContinuousQuotes(String str) {
+ return removeContinuousChar(str, '\'');
+ }
+
+ public static String reformatName(String str) {
+ return removeContinuousChar(str, '`');
+ }
+
+}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
similarity index 51%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
index feec23042..c6813b399 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
@@ -15,36 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.enums;
-import io.debezium.relational.history.TableChanges;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class DebeziumJson {
-
- private Map<String, String> before;
- private Map<String, Object> after;
- private Source source;
- private TableChanges.TableChange tableChange;
- private long tsMs;
- private String op;
- private Boolean incremental;
-
- @Builder
- @Data
- public static class Source {
+/**
+ * Alter type for alter column operation
+ */
+public enum AlterType {
- private String name;
- private String db;
- private String table;
- private List<String> pkNames;
- private Map<String, Integer> sqlType;
- private Map<String, String> mysqlType;
- }
+ RENAME_COLUMN,
+ ADD_COLUMN,
+ DROP_COLUMN,
+ MODIFY_COLUMN,
+ CHANGE_COLUMN
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java
similarity index 51%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java
index feec23042..c140e5b8d 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/IndexType.java
@@ -15,36 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.enums;
-import io.debezium.relational.history.TableChanges;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class DebeziumJson {
-
- private Map<String, String> before;
- private Map<String, Object> after;
- private Source source;
- private TableChanges.TableChange tableChange;
- private long tsMs;
- private String op;
- private Boolean incremental;
-
- @Builder
- @Data
- public static class Source {
+/**
+ * Index type for create table operation
+ * only support normal index and primary key
+ */
+public enum IndexType {
- private String name;
- private String db;
- private String table;
- private List<String> pkNames;
- private Map<String, Integer> sqlType;
- private Map<String, String> mysqlType;
- }
+ NORMAL_INDEX,
+ PRIMARY_KEY
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java
similarity index 51%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java
index feec23042..b5b6dee82 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/OperationType.java
@@ -15,36 +15,21 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.enums;
-import io.debezium.relational.history.TableChanges;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class DebeziumJson {
-
- private Map<String, String> before;
- private Map<String, Object> after;
- private Source source;
- private TableChanges.TableChange tableChange;
- private long tsMs;
- private String op;
- private Boolean incremental;
-
- @Builder
- @Data
- public static class Source {
+/**
+ * Operation type for ddl operation
+ */
+public enum OperationType {
- private String name;
- private String db;
- private String table;
- private List<String> pkNames;
- private Map<String, Integer> sqlType;
- private Map<String, String> mysqlType;
- }
+ CREATE,
+ ALTER,
+ DROP,
+ RENAME,
+ TRUNCATE,
+ INSERT,
+ UPDATE,
+ DELETE,
+ OTHER
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java
similarity index 51%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java
index feec23042..1eb15a59f 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/PositionType.java
@@ -15,36 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.enums;
-import io.debezium.relational.history.TableChanges;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class DebeziumJson {
-
- private Map<String, String> before;
- private Map<String, Object> after;
- private Source source;
- private TableChanges.TableChange tableChange;
- private long tsMs;
- private String op;
- private Boolean incremental;
-
- @Builder
- @Data
- public static class Source {
+/**
+ * Position type for add column operation
+ */
+public enum PositionType {
- private String name;
- private String db;
- private String table;
- private List<String> pkNames;
- private Map<String, Integer> sqlType;
- private Map<String, String> mysqlType;
- }
+ /**
+ * add column to first position of a table
+ */
+ FIRST,
+ /**
+ * add column after a certain column
+ */
+ AFTER
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
similarity index 53%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
index c3cafc2b5..23b3e1b7e 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
@@ -15,48 +15,41 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.expressions;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
-@Builder
+/**
+ * Alter column expression.
+ */
@JsonInclude(Include.NON_NULL)
@Data
-public class CanalJson {
-
- @JsonProperty("data")
- private List<Map<String, Object>> data;
- @JsonProperty("es")
- private long es;
- @JsonProperty("table")
- private String table;
- @JsonProperty("type")
- private String type;
- @JsonProperty("database")
- private String database;
- @JsonProperty("ts")
- private long ts;
- @JsonProperty("sql")
- private String sql;
- @JsonProperty("mysqlType")
- private Map<String, String> mysqlType;
- @JsonProperty("sqlType")
- private Map<String, Integer> sqlType;
- @JsonProperty("isDdl")
- private boolean isDdl;
- @JsonProperty("pkNames")
- private List<String> pkNames;
- @JsonProperty("schema")
- private String schema;
- @JsonProperty("oracleType")
- private Map<String, String> oracleType;
- @JsonProperty("incremental")
- private Boolean incremental;
+public class AlterColumn {
+
+ @JsonProperty("alterType")
+ private AlterType alterType;
+
+ @JsonProperty("newColumn")
+ private Column newColumn;
+
+ @JsonProperty("oldColumn")
+ private Column oldColumn;
+
+ @JsonCreator
+ public AlterColumn(@JsonProperty("alterType") AlterType alterType,
+ @JsonProperty("newColumn") Column newColumn,
+ @JsonProperty("oldColumn") Column oldColumn) {
+ this.alterType = alterType;
+ this.newColumn = newColumn;
+ this.oldColumn = oldColumn;
+ }
+ public AlterColumn(@JsonProperty("alterType") AlterType alterType) {
+ this.alterType = alterType;
+ }
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java
new file mode 100644
index 000000000..612e0f7e9
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.expressions;
+
+import java.util.List;
+import lombok.Builder;
+import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Column represents a column in a table.
+ */
+@Data
+@Builder
+@JsonInclude(Include.NON_NULL)
+public class Column {
+
+ @JsonProperty("name")
+ private String name;
+ @JsonProperty("definition")
+ private List<String> definition;
+ @JsonProperty("jdbcType")
+ private int jdbcType;
+ @JsonProperty("position")
+ private Position position;
+ @JsonProperty("isNullable")
+ private boolean isNullable;
+ @JsonProperty("defaultValue")
+ private String defaultValue;
+ @JsonProperty("comment")
+ private String comment;
+
+ @JsonCreator
+ public Column(@JsonProperty("name") String name,
@JsonProperty("definition") List<String> definition,
+ @JsonProperty("jdbcType") int jdbcType, @JsonProperty("position")
Position position,
+ @JsonProperty("isNullable") boolean isNullable,
@JsonProperty("defaultValue") String defaultValue,
+ @JsonProperty("comment") String comment) {
+ this.name = name;
+ this.definition = definition;
+ this.jdbcType = jdbcType;
+ this.position = position;
+ this.defaultValue = defaultValue;
+ this.comment = comment;
+ this.isNullable = isNullable;
+ }
+
+ public Column(@JsonProperty("name") String name) {
+ this.name = name;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java
similarity index 53%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java
index c3cafc2b5..d07cdff85 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java
@@ -15,48 +15,33 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.expressions;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
-@Builder
+/**
+ * Position represents the position of a column in a table.
+ * It can be either before or after a specific column.
+ */
@JsonInclude(Include.NON_NULL)
@Data
-public class CanalJson {
+public class Position {
+
+ @JsonProperty("positionType")
+ private PositionType positionType;
- @JsonProperty("data")
- private List<Map<String, Object>> data;
- @JsonProperty("es")
- private long es;
- @JsonProperty("table")
- private String table;
- @JsonProperty("type")
- private String type;
- @JsonProperty("database")
- private String database;
- @JsonProperty("ts")
- private long ts;
- @JsonProperty("sql")
- private String sql;
- @JsonProperty("mysqlType")
- private Map<String, String> mysqlType;
- @JsonProperty("sqlType")
- private Map<String, Integer> sqlType;
- @JsonProperty("isDdl")
- private boolean isDdl;
- @JsonProperty("pkNames")
- private List<String> pkNames;
- @JsonProperty("schema")
- private String schema;
- @JsonProperty("oracleType")
- private Map<String, String> oracleType;
- @JsonProperty("incremental")
- private Boolean incremental;
+ @JsonProperty("columnName")
+ private String columnName;
+ @JsonCreator
+ public Position(@JsonProperty("positionType") PositionType positionType,
+ @JsonProperty("columnName") String columnName) {
+ this.positionType = positionType;
+ this.columnName = columnName;
+ }
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java
similarity index 54%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java
index feec23042..e518865b1 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/indexes/Index.java
@@ -15,36 +15,22 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.debezium;
+package org.apache.inlong.sort.protocol.ddl.indexes;
-import io.debezium.relational.history.TableChanges;
import java.util.List;
-import java.util.Map;
-import lombok.Builder;
import lombok.Data;
+import org.apache.inlong.sort.protocol.ddl.enums.IndexType;
+/**
+ * Index for create table operation
+ */
@Data
-@Builder
-public class DebeziumJson {
+public class Index {
- private Map<String, String> before;
- private Map<String, Object> after;
- private Source source;
- private TableChanges.TableChange tableChange;
- private long tsMs;
- private String op;
- private Boolean incremental;
+ private IndexType indexType;
- @Builder
- @Data
- public static class Source {
+ private String indexName;
- private String name;
- private String db;
- private String table;
- private List<String> pkNames;
- private Map<String, Integer> sqlType;
- private Map<String, String> mysqlType;
- }
+ private List<String> indexColumns;
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java
similarity index 54%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java
index c3cafc2b5..a04bb4c4a 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/AlterOperation.java
@@ -15,48 +15,35 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.operations;
import java.util.List;
-import java.util.Map;
-import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
-@Builder
+/**
+ * Alter operation which contains alter columns.
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("alterOperation")
@JsonInclude(Include.NON_NULL)
@Data
-public class CanalJson {
+public class AlterOperation extends Operation {
+
+ @JsonProperty("alterColumns")
+ private List<AlterColumn> alterColumns;
- @JsonProperty("data")
- private List<Map<String, Object>> data;
- @JsonProperty("es")
- private long es;
- @JsonProperty("table")
- private String table;
- @JsonProperty("type")
- private String type;
- @JsonProperty("database")
- private String database;
- @JsonProperty("ts")
- private long ts;
- @JsonProperty("sql")
- private String sql;
- @JsonProperty("mysqlType")
- private Map<String, String> mysqlType;
- @JsonProperty("sqlType")
- private Map<String, Integer> sqlType;
- @JsonProperty("isDdl")
- private boolean isDdl;
- @JsonProperty("pkNames")
- private List<String> pkNames;
- @JsonProperty("schema")
- private String schema;
- @JsonProperty("oracleType")
- private Map<String, String> oracleType;
- @JsonProperty("incremental")
- private Boolean incremental;
+ @JsonCreator
+ public AlterOperation(@JsonProperty("alterColumns") List<AlterColumn>
alterColumns) {
+ super(OperationType.ALTER);
+ this.alterColumns = alterColumns;
+ }
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java
new file mode 100644
index 000000000..63db1cd8c
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.indexes.Index;
+
+/**
+ * CreateTableOperation represents a create table operation
+ * it can be "create table like" or "create table with columns and indexes"
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("createTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class CreateTableOperation extends Operation {
+
+ @JsonProperty("columns")
+ private List<Column> columns;
+
+ @JsonProperty("indexes")
+ private List<Index> indexes;
+
+ @JsonProperty("likeTable")
+ private String likeTable;
+
+ @JsonProperty("comment")
+ private String comment;
+
+ @JsonCreator
+ public CreateTableOperation(@JsonProperty("columns") List<Column> columns,
+ @JsonProperty("indexes") List<Index> indexes,
+ @JsonProperty("likeTable") String likeTable,
+ @JsonProperty("comment") String comment) {
+ super(OperationType.CREATE);
+ this.columns = columns;
+ this.indexes = indexes;
+ this.likeTable = likeTable;
+ this.comment = comment;
+ }
+
+ public CreateTableOperation() {
+ super(OperationType.CREATE);
+ }
+
+}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java
similarity index 52%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java
index c3cafc2b5..34921a188 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java
@@ -15,48 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.operations;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
-@Builder
+/**
+ * DropTableOperation represents a drop operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("dropTableOperation")
@JsonInclude(Include.NON_NULL)
@Data
-public class CanalJson {
+public class DropTableOperation extends Operation {
- @JsonProperty("data")
- private List<Map<String, Object>> data;
- @JsonProperty("es")
- private long es;
- @JsonProperty("table")
- private String table;
- @JsonProperty("type")
- private String type;
- @JsonProperty("database")
- private String database;
- @JsonProperty("ts")
- private long ts;
- @JsonProperty("sql")
- private String sql;
- @JsonProperty("mysqlType")
- private Map<String, String> mysqlType;
- @JsonProperty("sqlType")
- private Map<String, Integer> sqlType;
- @JsonProperty("isDdl")
- private boolean isDdl;
- @JsonProperty("pkNames")
- private List<String> pkNames;
- @JsonProperty("schema")
- private String schema;
- @JsonProperty("oracleType")
- private Map<String, String> oracleType;
- @JsonProperty("incremental")
- private Boolean incremental;
+ @JsonCreator
+ public DropTableOperation() {
+ super(OperationType.DROP);
+ }
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
new file mode 100644
index 000000000..3d44a242c
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+/**
+ * Operation represents a ddl operation.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = AlterOperation.class, name =
"alterOperation"),
+ @JsonSubTypes.Type(value = CreateTableOperation.class, name =
"createTableOperation"),
+ @JsonSubTypes.Type(value = DropTableOperation.class, name =
"dropTableOperation"),
+ @JsonSubTypes.Type(value = TruncateTableOperation.class, name =
"truncateTableOperation"),
+ @JsonSubTypes.Type(value = RenameTableOperation.class, name =
"renameTableOperation")
+})
+@Data
+@NoArgsConstructor
+public abstract class Operation {
+
+ @JsonProperty("operationType")
+ private OperationType operationType;
+
+ public Operation(@JsonProperty("operationType") OperationType type) {
+ this.operationType = type;
+ }
+
+}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java
similarity index 52%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java
index c3cafc2b5..7dc7205bc 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java
@@ -15,48 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.operations;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
-@Builder
+/**
+ * RenameTableOperation represents a rename operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("renameTableOperation")
@JsonInclude(Include.NON_NULL)
@Data
-public class CanalJson {
+public class RenameTableOperation extends Operation {
- @JsonProperty("data")
- private List<Map<String, Object>> data;
- @JsonProperty("es")
- private long es;
- @JsonProperty("table")
- private String table;
- @JsonProperty("type")
- private String type;
- @JsonProperty("database")
- private String database;
- @JsonProperty("ts")
- private long ts;
- @JsonProperty("sql")
- private String sql;
- @JsonProperty("mysqlType")
- private Map<String, String> mysqlType;
- @JsonProperty("sqlType")
- private Map<String, Integer> sqlType;
- @JsonProperty("isDdl")
- private boolean isDdl;
- @JsonProperty("pkNames")
- private List<String> pkNames;
- @JsonProperty("schema")
- private String schema;
- @JsonProperty("oracleType")
- private Map<String, String> oracleType;
- @JsonProperty("incremental")
- private Boolean incremental;
+ @JsonCreator
+ public RenameTableOperation() {
+ super(OperationType.RENAME);
+ }
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java
similarity index 52%
copy from
inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java
index c3cafc2b5..b30af9b07 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java
@@ -15,48 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.json.canal;
+package org.apache.inlong.sort.protocol.ddl.operations;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
-@Builder
+/**
+ * TruncateTableOperation represents a truncate operation on table
+ */
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("truncateTableOperation")
@JsonInclude(Include.NON_NULL)
@Data
-public class CanalJson {
+public class TruncateTableOperation extends Operation {
- @JsonProperty("data")
- private List<Map<String, Object>> data;
- @JsonProperty("es")
- private long es;
- @JsonProperty("table")
- private String table;
- @JsonProperty("type")
- private String type;
- @JsonProperty("database")
- private String database;
- @JsonProperty("ts")
- private long ts;
- @JsonProperty("sql")
- private String sql;
- @JsonProperty("mysqlType")
- private Map<String, String> mysqlType;
- @JsonProperty("sqlType")
- private Map<String, Integer> sqlType;
- @JsonProperty("isDdl")
- private boolean isDdl;
- @JsonProperty("pkNames")
- private List<String> pkNames;
- @JsonProperty("schema")
- private String schema;
- @JsonProperty("oracleType")
- private Map<String, String> oracleType;
- @JsonProperty("incremental")
- private Boolean incremental;
+ @JsonCreator
+ public TruncateTableOperation() {
+ super(OperationType.TRUNCATE);
+ }
}
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
index 536ffb57e..9e26d58c4 100644
---
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -746,6 +746,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
emit(record, insert, tableSchema, out);
} catch (Exception e) {
LOG.error("Failed to extract DDL record {}", record, e);
+ throw new RuntimeException(e);
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 94f7b2c60..695c4363d 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -27,8 +27,15 @@ import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.Map.Entry;
+import java.util.Set;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Table;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.RenameTableStatement;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.base.enums.ReadPhase;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
@@ -46,6 +53,8 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -57,6 +66,7 @@ import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isHighWa
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isWatermarkEvent;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.toSnapshotRecord;
+import static
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.reformatName;
/**
* The {@link RecordEmitter} implementation for {@link MySqlSourceReader}.
@@ -87,6 +97,7 @@ public final class MySqlRecordEmitter<T>
private volatile long snapProcessTime = 0L;
private boolean includeIncremental;
+ public final ObjectMapper objectMapper = new ObjectMapper();
public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
@@ -126,8 +137,9 @@ public final class MySqlRecordEmitter<T>
if (tableChanges.isEmpty()) {
TableId tableId = RecordUtils.getTableId(element);
- // if this table is one of the captured tables, output the ddl
element
- if
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
+ // if this table is one of the captured tables, output the ddl
element.
+ if
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)
+ || shouldOutputRenameDdl(element, tableId)) {
outputDdlElement(element, output, splitState, null);
}
}
@@ -185,6 +197,34 @@ public final class MySqlRecordEmitter<T>
}
}
+ /**
+ * if rename operation is "rename a to b" where a is the captured table
+ * this method extract table names a and b, if any of table name is the
captured table
+ * we should output ddl element
+ */
+ private boolean shouldOutputRenameDdl(SourceRecord element, TableId
tableId) {
+ try {
+ String ddl = objectMapper.readTree(((Struct)
element.value()).get(HISTORY_RECORD_FIELD).toString())
+ .get(DDL_FIELD_NAME).asText();
+ Statement statement = CCJSqlParserUtil.parse(ddl);
+ if (statement instanceof RenameTableStatement) {
+ RenameTableStatement renameTableStatement =
(RenameTableStatement) statement;
+ Set<Entry<Table, Table>> tableNames =
renameTableStatement.getTableNames();
+ for (Entry<Table, Table> entry : tableNames) {
+ Table oldTable = entry.getKey();
+ Table newTable = entry.getValue();
+ if
(reformatName(oldTable.getName()).equals(tableId.table()) ||
+
reformatName(newTable.getName()).equals(tableId.table())) {
+ return true;
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("parse ddl error {}", element, e);
+ }
+ return false;
+ }
+
private void updateSnapshotRecord(SourceRecord element, MySqlSplitState
splitState) {
if (splitState.isSnapshotSplitState() && includeIncremental) {
toSnapshotRecord(element);
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index a44993891..498244e45 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -17,17 +17,16 @@
package org.apache.inlong.sort.cdc.mysql.table;
-import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
-
-import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static
org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getCanalData;
+import static
org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getDebeziumData;
+import static org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getMetaData;
+import static org.apache.inlong.sort.cdc.mysql.utils.MetaDataUtils.getOpType;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
-import io.debezium.relational.history.TableChanges.TableChange;
-import java.util.LinkedHashMap;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericArrayData;
@@ -38,17 +37,11 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.inlong.sort.cdc.base.debezium.table.MetadataConverter;
-import org.apache.inlong.sort.cdc.base.util.RecordUtils;
-import org.apache.inlong.sort.formats.json.canal.CanalJson;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/**
@@ -163,28 +156,7 @@ public enum MySqlReadableMetadata {
@Override
public Object read(SourceRecord record,
@Nullable TableChanges.TableChange tableSchema,
RowData rowData) {
- // construct debezium json
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct =
messageStruct.getStruct(FieldName.SOURCE);
- GenericRowData data = (GenericRowData) rowData;
- Map<String, Object> field = (Map<String, Object>)
data.getField(0);
-
- Source source = Source.builder().db(getMetaData(record,
AbstractSourceInfo.DATABASE_NAME_KEY))
- .table(getMetaData(record,
AbstractSourceInfo.TABLE_NAME_KEY))
-
.name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
- .sqlType(getSqlType(tableSchema))
- .pkNames(getPkNames(tableSchema))
- .mysqlType(getMysqlType(tableSchema))
- .build();
- DebeziumJson debeziumJson =
DebeziumJson.builder().after(field).source(source)
-
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
-
.tableChange(tableSchema).incremental(isSnapshotRecord(sourceStruct)).build();
-
- try {
- return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
- } catch (Exception e) {
- throw new IllegalStateException("exception occurs when
get meta data", e);
- }
+ return getDebeziumData(record, tableSchema,
(GenericRowData) rowData);
}
}),
@@ -436,46 +408,6 @@ public enum MySqlReadableMetadata {
}
});
- private static StringData getCanalData(SourceRecord record, GenericRowData
rowData,
- TableChange tableSchema) {
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
- // tableName
- String tableName = getMetaData(record,
AbstractSourceInfo.TABLE_NAME_KEY);
- // databaseName
- String databaseName = getMetaData(record,
AbstractSourceInfo.DATABASE_NAME_KEY);
- // opTs
- long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
- // actual data
- GenericRowData data = rowData;
- Map<String, Object> field = (Map<String, Object>) data.getField(0);
- List<Map<String, Object>> dataList = new ArrayList<>();
-
- CanalJson canalJson = CanalJson.builder()
- .database(databaseName)
- .es(opTs).pkNames(getPkNames(tableSchema))
- .mysqlType(getMysqlType(tableSchema)).table(tableName)
- .type(getCanalOpType(rowData)).sqlType(getSqlType(tableSchema))
- .incremental(isSnapshotRecord(sourceStruct)).build();
-
- try {
- if (RecordUtils.isDdlRecord(messageStruct)) {
- canalJson.setSql((String) field.get(DDL_FIELD_NAME));
- canalJson.setDdl(true);
- canalJson.setData(dataList);
- } else {
- canalJson.setDdl(false);
- canalJson.setTs((Long) messageStruct.get(FieldName.TIMESTAMP));
- dataList.add(field);
- canalJson.setData(dataList);
- }
- return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
- } catch (Exception e) {
- throw new IllegalStateException("exception occurs when get meta
data", e);
- }
-
- }
-
private final String key;
private final DataType dataType;
private final MetadataConverter converter;
@@ -487,103 +419,6 @@ public enum MySqlReadableMetadata {
this.converter = converter;
}
- private static String getOpType(SourceRecord record) {
- String opType;
- final Envelope.Operation op = Envelope.operationFor(record);
- if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
- opType = "INSERT";
- } else if (op == Envelope.Operation.DELETE) {
- opType = "DELETE";
- } else {
- opType = "UPDATE";
- }
- return opType;
- }
-
- private static String getCanalOpType(GenericRowData record) {
- String opType;
- switch (record.getRowKind()) {
- case DELETE:
- case UPDATE_BEFORE:
- opType = "DELETE";
- break;
- case INSERT:
- case UPDATE_AFTER:
- opType = "INSERT";
- break;
- default:
- throw new IllegalStateException("the record only have states
in DELETE, "
- + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
- }
- return opType;
- }
-
- private static String getDebeziumOpType(GenericRowData record) {
- String opType;
- switch (record.getRowKind()) {
- case DELETE:
- case UPDATE_BEFORE:
- opType = "d";
- break;
- case INSERT:
- case UPDATE_AFTER:
- opType = "c";
- break;
- default:
- throw new IllegalStateException("the record only have states
in DELETE, "
- + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
- }
- return opType;
- }
-
- private static List<String> getPkNames(@Nullable TableChanges.TableChange
tableSchema) {
- if (tableSchema == null) {
- return null;
- }
- return tableSchema.getTable().primaryKeyColumnNames();
- }
-
- public static Map<String, String> getMysqlType(@Nullable
TableChanges.TableChange tableSchema) {
- if (tableSchema == null) {
- return null;
- }
- Map<String, String> mysqlType = new LinkedHashMap<>();
- final Table table = tableSchema.getTable();
- table.columns()
- .forEach(
- column -> {
- mysqlType.put(
- column.name(),
- String.format(
- "%s(%d)",
- column.typeName(),
- column.length()));
- });
- return mysqlType;
- }
-
- /**
- * get sql type from table schema, represents the jdbc data type
- *
- * @param tableSchema table schema
- */
- public static Map<String, Integer> getSqlType(@Nullable
TableChanges.TableChange tableSchema) {
- if (tableSchema == null) {
- return null;
- }
- Map<String, Integer> sqlType = new LinkedHashMap<>();
- final Table table = tableSchema.getTable();
- table.columns().forEach(
- column -> sqlType.put(column.name(), column.jdbcType()));
- return sqlType;
- }
-
- public static String getMetaData(SourceRecord record, String tableNameKey)
{
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
- return sourceStruct.getString(tableNameKey);
- }
-
public String getKey() {
return key;
}
@@ -595,4 +430,4 @@ public enum MySqlReadableMetadata {
public MetadataConverter getConverter() {
return converter;
}
-}
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
new file mode 100644
index 000000000..50ad44d2b
--- /dev/null
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.utils;
+
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
+import static
org.apache.inlong.sort.cdc.mysql.utils.OperationUtils.generateOperation;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for generating metadata in mysql cdc.
+ */
+public class MetaDataUtils {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetaDataUtils.class);
+
+ /**
+ * get sql type from table schema, represents the jdbc data type
+ *
+ * @param tableSchema table schema
+ */
+ public static Map<String, Integer> getSqlType(@Nullable
TableChanges.TableChange tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ Map<String, Integer> sqlType = new LinkedHashMap<>();
+ final Table table = tableSchema.getTable();
+ table.columns().forEach(
+ column -> sqlType.put(column.name(), column.jdbcType()));
+ return sqlType;
+ }
+
+ public static String getMetaData(SourceRecord record, String tableNameKey)
{
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+ return sourceStruct.getString(tableNameKey);
+ }
+
+ public static String getOpType(SourceRecord record) {
+ String opType;
+ final Envelope.Operation op = Envelope.operationFor(record);
+ if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+ opType = "INSERT";
+ } else if (op == Envelope.Operation.DELETE) {
+ opType = "DELETE";
+ } else {
+ opType = "UPDATE";
+ }
+ return opType;
+ }
+
+ public static String getCanalOpType(GenericRowData record) {
+ String opType;
+ switch (record.getRowKind()) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ opType = "DELETE";
+ break;
+ case INSERT:
+ case UPDATE_AFTER:
+ opType = "INSERT";
+ break;
+ default:
+ throw new IllegalStateException("the record only have states
in DELETE, "
+ + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+ }
+ return opType;
+ }
+
+ public static String getDebeziumOpType(GenericRowData record) {
+ String opType;
+ switch (record.getRowKind()) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ opType = "d";
+ break;
+ case INSERT:
+ case UPDATE_AFTER:
+ opType = "c";
+ break;
+ default:
+ throw new IllegalStateException("the record only have states
in DELETE, "
+ + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+ }
+ return opType;
+ }
+
+ public static List<String> getPkNames(@Nullable TableChanges.TableChange
tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ return tableSchema.getTable().primaryKeyColumnNames();
+ }
+
+ public static Map<String, String> getMysqlType(@Nullable
TableChanges.TableChange tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ Map<String, String> mysqlType = new LinkedHashMap<>();
+ final Table table = tableSchema.getTable();
+ table.columns()
+ .forEach(
+ column -> {
+ mysqlType.put(
+ column.name(),
+ String.format(
+ "%s(%d)",
+ column.typeName(),
+ column.length()));
+ });
+ return mysqlType;
+ }
+
+ public static StringData getCanalData(SourceRecord record, GenericRowData
rowData,
+ TableChange tableSchema) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+ // tableName
+ String tableName = getMetaData(record,
AbstractSourceInfo.TABLE_NAME_KEY);
+ // databaseName
+ String databaseName = getMetaData(record,
AbstractSourceInfo.DATABASE_NAME_KEY);
+ // opTs
+ long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+ // actual data
+ Map<String, Object> field = (Map<String, Object>) rowData.getField(0);
+ List<Map<String, Object>> dataList = new ArrayList<>();
+
+ CanalJson canalJson = CanalJson.builder()
+ .database(databaseName)
+ .es(opTs).pkNames(getPkNames(tableSchema))
+ .mysqlType(getMysqlType(tableSchema)).table(tableName)
+ .incremental(!isSnapshotRecord(sourceStruct))
+
.dataSourceName(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
+ .type(getCanalOpType(rowData))
+ .sqlType(getSqlType(tableSchema))
+ .build();
+
+ try {
+ if (RecordUtils.isDdlRecord(messageStruct)) {
+ String sql = (String) field.get(DDL_FIELD_NAME);
+ canalJson.setSql(sql);
+ canalJson.setOperation(generateOperation(sql,
getSqlType(tableSchema)));
+ canalJson.setDdl(true);
+ canalJson.setData(dataList);
+ } else {
+ canalJson.setDdl(false);
+ canalJson.setTs((Long) messageStruct.get(FieldName.TIMESTAMP));
+ dataList.add(field);
+ canalJson.setData(dataList);
+ }
+ LOG.debug("canal json: {}", canalJson);
+ return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
+ } catch (Exception e) {
+ throw new IllegalStateException("exception occurs when get meta
data", e);
+ }
+ }
+
+ public static StringData getDebeziumData(SourceRecord record, TableChange
tableSchema,
+ GenericRowData rowData) {
+ // construct debezium json
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+ GenericRowData data = rowData;
+ Map<String, Object> field = (Map<String, Object>) data.getField(0);
+
+ Source source = Source.builder().db(getMetaData(record,
AbstractSourceInfo.DATABASE_NAME_KEY))
+ .table(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY))
+
.name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
+ .sqlType(getSqlType(tableSchema))
+ .pkNames(getPkNames(tableSchema))
+ .mysqlType(getMysqlType(tableSchema))
+ .build();
+ DebeziumJson debeziumJson = DebeziumJson.builder().source(source)
+
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
+
.dataSourceName(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
+
.tableChange(tableSchema).incremental(!isSnapshotRecord(sourceStruct)).build();
+
+ try {
+ if (RecordUtils.isDdlRecord(messageStruct)) {
+ String sql = (String) field.get(DDL_FIELD_NAME);
+ debeziumJson.setDdl(sql);
+ debeziumJson.setOperation(generateOperation(sql,
getSqlType(tableSchema)));
+ debeziumJson.setAfter(new HashMap<>());
+ } else {
+ debeziumJson.setAfter(field);
+ }
+ return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
+ } catch (Exception e) {
+ throw new IllegalStateException("exception occurs when get meta
data {}", e);
+ }
+
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
new file mode 100644
index 000000000..51c2533a0
--- /dev/null
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.utils;
+
+import static
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.parseColumnWithPosition;
+import static
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.parseColumns;
+import static
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.parseComment;
+import static
org.apache.inlong.sort.protocol.ddl.Utils.ColumnUtils.reformatName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.alter.RenameTableStatement;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import net.sf.jsqlparser.statement.drop.Drop;
+import net.sf.jsqlparser.statement.truncate.Truncate;
+import org.apache.commons.lang.StringUtils;
+import
org.apache.inlong.sort.cdc.base.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.IndexType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.indexes.Index;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.DropTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.ddl.operations.RenameTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.TruncateTableOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for generate operation from statement from sqlParser.
+ */
+public class OperationUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class);
+ public static final String PRIMARY_KEY = "PRIMARY KEY";
+ public static final String NORMAL_INDEX = "NORMAL_INDEX";
+ public static final String FIRST = "FIRST";
+
+ /**
+ * generate operation from sql and table schema.
+ * @param sql sql from binlog
+ * @param sqlType table sql types
+ * @return Operation
+ */
+ public static Operation generateOperation(String sql, Map<String, Integer>
sqlType) {
+ try {
+ // now sqlParser don't support first position
+ // remove it first and add it later
+ boolean endsWithFirst = StringUtils.endsWithIgnoreCase(sql, FIRST);
+ if (endsWithFirst) {
+ sql = removeFirstFlag(sql);
+ }
+ Statement statement = CCJSqlParserUtil.parse(sql);
+ if (statement instanceof Alter) {
+ return parseAlterOperation(
+ (Alter) statement, sqlType, endsWithFirst);
+ } else if (statement instanceof CreateTable) {
+ return parseCreateTableOperation(
+ (CreateTable) statement, sqlType);
+ } else if (statement instanceof Drop) {
+ return new DropTableOperation();
+ } else if (statement instanceof Truncate) {
+ return new TruncateTableOperation();
+ } else if (statement instanceof RenameTableStatement) {
+ return new RenameTableOperation();
+ } else {
+ LOG.warn("doesn't support sql {}, statement {}", sql,
statement);
+ }
+ } catch (Exception e) {
+ LOG.error("parse ddl error: {}, set ddl to null", sql, e);
+ }
+ return null;
+ }
+
+ /**
+ * parse alter operation from Alter from sqlParser.
+ * @param statement alter statement
+ * @param sqlType sql types
+ * @param isFirst whether the column is first
+ * @return AlterOperation
+ */
+ private static AlterOperation parseAlterOperation(Alter statement,
+ Map<String, Integer> sqlType, boolean isFirst) {
+
+ List<AlterColumn> alterColumns = new ArrayList<>();
+ statement.getAlterExpressions().forEach(alterExpression -> {
+ switch (alterExpression.getOperation()) {
+ case DROP:
+ alterColumns.add(new AlterColumn(AlterType.DROP_COLUMN,
+ null,
+
Column.builder().name(reformatName(alterExpression.getColumnName()))
+ .build()));
+ break;
+ case ADD:
+ alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN,
+ parseColumnWithPosition(isFirst, sqlType,
+
alterExpression.getColDataTypeList().get(0)),
+ null));
+ break;
+ case RENAME:
+ alterColumns.add(new AlterColumn(AlterType.CHANGE_COLUMN,
+ new
Column(reformatName(alterExpression.getColumnName())),
+ new
Column(reformatName(alterExpression.getColumnOldName()))));
+ break;
+ case MODIFY:
+ // modify column use change column type
+ case CHANGE:
+ alterColumns.add(new AlterColumn(AlterType.CHANGE_COLUMN,
+ parseColumnWithPosition(isFirst, sqlType,
+
alterExpression.getColDataTypeList().get(0)),
+ new
Column(reformatName(alterExpression.getColumnOldName()))));
+ break;
+ default:
+ LOG.warn("doesn't support alter operation {}, statement
{}",
+ alterExpression.getOperation(), statement);
+ }
+
+ });
+
+ return new AlterOperation(alterColumns);
+ }
+
+ /**
+ * parse create table operation from CreateTable from sqlParser.
+ * @param statement create table statement
+ * @return CreateTableOperation
+ */
+ private static CreateTableOperation parseCreateTableOperation(
+ CreateTable statement, Map<String, Integer> sqlType) {
+
+ CreateTableOperation createTableOperation = new CreateTableOperation();
+ List<ColumnDefinition> columnDefinitions =
statement.getColumnDefinitions();
+
+ if (statement.getLikeTable() != null) {
+ createTableOperation.setLikeTable(parseLikeTable(statement));
+ return createTableOperation;
+ }
+
+ createTableOperation.setColumns(parseColumns(sqlType,
columnDefinitions));
+ createTableOperation.setIndexes(parseIndexes(statement));
+
createTableOperation.setComment(parseComment(statement.getTableOptionsStrings()));
+
+ return createTableOperation;
+ }
+
+ /**
+ * parse indexes from statement
+ * only support primary key and normal index.
+ * @param statement create table statement
+ * @return list of indexes
+ */
+ private static List<Index> parseIndexes(CreateTable statement) {
+
+ if (statement.getIndexes() == null) {
+ return new ArrayList<>();
+ }
+ List<Index> indexList = new ArrayList<>();
+
+ for (net.sf.jsqlparser.statement.create.table.Index perIndex :
statement.getIndexes()) {
+ Index index = new Index();
+ switch (perIndex.getType()) {
+ case PRIMARY_KEY:
+ index.setIndexType(IndexType.PRIMARY_KEY);
+ break;
+ case NORMAL_INDEX:
+ index.setIndexType(IndexType.NORMAL_INDEX);
+ break;
+ default:
+ LOG.error("unsupported index type {}", perIndex.getType());
+ break;
+ }
+ List<String> columns = new ArrayList<>();
+ perIndex.getColumnsNames().forEach(columnName ->
columns.add(reformatName(columnName)));
+ index.setIndexName(reformatName(perIndex.getName()));
+ index.setIndexColumns(columns);
+ indexList.add(index);
+ }
+
+ return indexList;
+
+ }
+
+ /**
+ * remove the first flag from sql.
+ * @param sql sql from binlog
+ * @return sql without first flag
+ */
+ private static String removeFirstFlag(String sql) {
+ return sql.substring(0, StringUtils.lastIndexOfIgnoreCase(sql, FIRST));
+ }
+
+ /**
+ * get like table from statement.
+ * @param statement create table statement
+ * @return like table name
+ */
+ private static String parseLikeTable(CreateTable statement) {
+ if (statement.getLikeTable() != null) {
+ return statement.getLikeTable().getName();
+ }
+ return "";
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
b/inlong-sort/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
new file mode 100644
index 000000000..fdc990fa7
--- /dev/null
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc;
+
+import java.util.HashMap;
+import org.apache.inlong.sort.cdc.mysql.utils.OperationUtils;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for table operations
+ */
+public class TestOperation {
+
+ @Test
+ public void testRenameTableOperation() {
+ String sql = "rename table `tv3` to `tv4`";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ sqlType.put("tv3", 1);
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertEquals(operation.getOperationType(),
OperationType.RENAME);
+ }
+
+ @Test
+ public void testDropTableOperation() {
+ String sql = "drop table `tv3`";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ sqlType.put("tv3", 1);
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertEquals(operation.getOperationType(), OperationType.DROP);
+ }
+
+ @Test
+ public void testAddColumnOperation() {
+ String sql = "alter table a add column b int comment \"test\" first";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ sqlType.put("b", 1);
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertEquals(operation.getOperationType(), OperationType.ALTER);
+ AlterColumn alterColumn = ((AlterOperation)
operation).getAlterColumns().get(0);
+ Assert.assertEquals(alterColumn.getAlterType(), AlterType.ADD_COLUMN);
+ Assert.assertEquals(alterColumn.getNewColumn().getName(), "b");
+
Assert.assertEquals(alterColumn.getNewColumn().getPosition().getPositionType(),
PositionType.FIRST);
+
Assert.assertNull(alterColumn.getNewColumn().getPosition().getColumnName());
+ }
+
+ @Test
+ public void testRenameColumnOperation() {
+ String sql = "alter table a CHANGE b c int";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ sqlType.put("c", 1);
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertEquals(operation.getOperationType(), OperationType.ALTER);
+ AlterColumn alterColumn = ((AlterOperation)
operation).getAlterColumns().get(0);
+ Assert.assertEquals(alterColumn.getAlterType(),
AlterType.CHANGE_COLUMN);
+ Assert.assertEquals(alterColumn.getNewColumn().getName(), "c");
+ Assert.assertEquals(alterColumn.getOldColumn().getName(), "b");
+ }
+
+}
diff --git a/inlong-sort/sort-formats/format-json/pom.xml
b/inlong-sort/sort-formats/format-json/pom.xml
index a108c1eaf..93228f370 100644
--- a/inlong-sort/sort-formats/format-json/pom.xml
+++ b/inlong-sort/sort-formats/format-json/pom.xml
@@ -78,7 +78,11 @@
<artifactId>debezium-core</artifactId>
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
index c3cafc2b5..896299f69 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
@@ -19,13 +19,18 @@ package org.apache.inlong.sort.formats.json.canal;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
@Builder
+@JsonTypeName("canalJson")
@JsonInclude(Include.NON_NULL)
@Data
public class CanalJson {
@@ -56,7 +61,46 @@ public class CanalJson {
private String schema;
@JsonProperty("oracleType")
private Map<String, String> oracleType;
+ @JsonProperty("operation")
+ private Operation operation;
@JsonProperty("incremental")
private Boolean incremental;
+ @JsonProperty("dataSourceName")
+ private String dataSourceName;
+
+ @JsonCreator
+ public CanalJson(@Nullable @JsonProperty("data") List<Map<String, Object>>
data,
+ @JsonProperty("es") long es,
+ @JsonProperty("table") String table,
+ @JsonProperty("type") String type,
+ @JsonProperty("database") String database,
+ @JsonProperty("ts") long ts,
+ @JsonProperty("sql") String sql,
+ @Nullable @JsonProperty("mysqlType") Map<String, String> mysqlType,
+ @Nullable @JsonProperty("sqlType") Map<String, Integer> sqlType,
+ @JsonProperty("isDdl") boolean isDdl,
+ @Nullable @JsonProperty("pkNames") List<String> pkNames,
+ @JsonProperty("schema") String schema,
+ @Nullable @JsonProperty("oracleType") Map<String, String>
oracleType,
+ @JsonProperty("operation") Operation operation,
+ @JsonProperty("incremental") Boolean incremental,
+ @JsonProperty("dataSourceName") String dataSourceName) {
+ this.data = data;
+ this.es = es;
+ this.table = table;
+ this.type = type;
+ this.database = database;
+ this.ts = ts;
+ this.sql = sql;
+ this.mysqlType = mysqlType;
+ this.sqlType = sqlType;
+ this.isDdl = isDdl;
+ this.pkNames = pkNames;
+ this.schema = schema;
+ this.oracleType = oracleType;
+ this.operation = operation;
+ this.incremental = incremental;
+ this.dataSourceName = dataSourceName;
+ }
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
index feec23042..d736d2467 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
+++
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
@@ -18,22 +18,64 @@
package org.apache.inlong.sort.formats.json.debezium;
import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
import java.util.List;
import java.util.Map;
import lombok.Builder;
import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
-@Data
@Builder
+@JsonTypeName("canalJson")
+@JsonInclude(Include.NON_NULL)
+@Data
public class DebeziumJson {
+ @JsonProperty("before")
private Map<String, String> before;
+ @JsonProperty("after")
private Map<String, Object> after;
+ @JsonProperty("source")
private Source source;
+ @JsonProperty("tableChange")
private TableChanges.TableChange tableChange;
+ @JsonProperty("tsMs")
private long tsMs;
+ @JsonProperty("op")
private String op;
- private Boolean incremental;
+ @JsonProperty("incremental")
+ private boolean incremental;
+ @JsonProperty("ddl")
+ private String ddl;
+ @JsonProperty("operation")
+ private Operation operation;
+ @JsonProperty("dataSourceName")
+ private String dataSourceName;
+
+ public DebeziumJson(@JsonProperty("before") Map<String, String> before,
+ @JsonProperty("after") Map<String, Object> after,
+ @JsonProperty("source") Source source,
+ @JsonProperty("tableChange") TableChange tableChange,
+ @JsonProperty("tsMs") long tsMs, @JsonProperty("op") String op,
+ @JsonProperty("incremental") boolean incremental,
+ @JsonProperty("ddl") String ddl,
+ @JsonProperty("operation") Operation operation,
+ @JsonProperty("dataSourceName") String dataSourceName) {
+ this.before = before;
+ this.after = after;
+ this.source = source;
+ this.tableChange = tableChange;
+ this.tsMs = tsMs;
+ this.op = op;
+ this.incremental = incremental;
+ this.ddl = ddl;
+ this.operation = operation;
+ this.dataSourceName = dataSourceName;
+ }
@Builder
@Data
diff --git
a/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationTest.java
b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationTest.java
new file mode 100644
index 000000000..2384e2162
--- /dev/null
+++
b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.ArrayList;
+import java.util.List;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link CanalJson}.
+ */
+public class CanalJsonSerializationTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CanalJsonSerializationTest.class);
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Test
+ public void testCanalJsonSerialization() {
+
+ List<AlterColumn> alterColumns = new ArrayList<>();
+
+ Column column = new Column("columnDataType.getColumnName()", new
ArrayList<>(),
+ 1,
+ new Position(PositionType.FIRST, null), true, "23",
+ "23");
+
+ alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN, column, null));
+
+ AlterOperation alterOperation = new AlterOperation(alterColumns);
+
+ CanalJson canalJson = CanalJson.builder()
+ .data(null)
+ .es(0)
+ .table("table")
+ .type("type")
+ .database("database")
+ .ts(0)
+ .sql("sql")
+ .mysqlType(null)
+ .sqlType(null)
+ .pkNames(null)
+ .schema("schema")
+ .oracleType(null)
+ .operation(alterOperation)
+ .incremental(false)
+ .build();
+
+ ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ try {
+ String writeValueAsString =
OBJECT_MAPPER.writeValueAsString(canalJson);
+ LOG.info(writeValueAsString);
+ objectMapper.readValue(writeValueAsString, CanalJson.class);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git
a/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/DebeziumJsonSerializationTest.java
b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/DebeziumJsonSerializationTest.java
new file mode 100644
index 000000000..c3f4bdd71
--- /dev/null
+++
b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/DebeziumJsonSerializationTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link DebeziumJson}.
+ */
+public class DebeziumJsonSerializationTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CanalJsonSerializationTest.class);
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Test
+ public void testDebeziumJsonSerialization() {
+
+ List<AlterColumn> alterColumns = new ArrayList<>();
+
+ Column column = new Column("columnDataType.getColumnName()", new
ArrayList<>(),
+ 1,
+ new Position(PositionType.FIRST, null), true, "23",
+ "23");
+
+ alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN, column, null));
+
+ AlterOperation alterOperation = new AlterOperation(alterColumns);
+
+ DebeziumJson debeziumJson = DebeziumJson.builder().source(null)
+ .dataSourceName("dataSourceName")
+ .tableChange(null).incremental(false).build();
+
+ debeziumJson.setDdl("");
+ debeziumJson.setOperation(alterOperation);
+ debeziumJson.setAfter(new HashMap<>());
+
+ ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ try {
+ String writeValueAsString =
OBJECT_MAPPER.writeValueAsString(debeziumJson);
+ LOG.info(writeValueAsString);
+ objectMapper.readValue(writeValueAsString, DebeziumJson.class);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 057dc5d77..40ab5780a 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -873,7 +873,7 @@ The text of each license is the standard Apache 2.0 license.
net.sf.jpam:jpam:1.1 - JPam (http://jpam.sourceforge.net/), (The Apache
Software License, Version 2.0)
com.cedarsoftware:json-io:2.5.1 - Java JSON serialization
(https://github.com/jdereg/json-io), (Apache License, Version 2.0)
net.minidev:json-smart:2.3 - JSON Small and Fast Parser
(https://github.com/netplex/json-smart-v2/tree/v2.3), (The Apache Software
License, Version 2.0)
- com.github.jsqlparser:jsqlparser:2.1 - JSQLParser library
(https://github.com/JSQLParser/JSqlParser), (The Apache Software License,
Version 2.0; GNU Library or Lesser General Public License (LGPL) V2.1)
+ com.github.jsqlparser:jsqlparser:4.2 - JSQLParser library
(https://github.com/JSQLParser/JSqlParser), (The Apache Software License,
Version 2.0; GNU Library or Lesser General Public License (LGPL) V2.1)
org.apache.kerby:kerb-admin:1.0.1 - Kerby-kerb Admin
(http://directory.apache.org/kerby/kerby-kerb/kerb-admin), (Apache License,
Version 2.0)
org.apache.kerby:kerb-client:1.0.1 - Kerby-kerb Client
(http://directory.apache.org/kerby/kerby-kerb/kerb-client), (Apache License,
Version 2.0)
org.apache.kerby:kerb-common:1.0.1 - Kerby-kerb Common
(http://directory.apache.org/kerby/kerby-kerb/kerb-common), (Apache License,
Version 2.0)
diff --git a/pom.xml b/pom.xml
index 62e5a5ad0..56dbf6dfb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -208,6 +208,7 @@
<tomcat.version>8.5.53</tomcat.version>
<jedis.version>2.9.0</jedis.version>
<poi.version>5.2.3</poi.version>
+ <jsqlparser.version>4.2</jsqlparser.version>
</properties>
<dependencyManagement>
@@ -233,6 +234,11 @@
<artifactId>flume-ng-configuration</artifactId>
<version>${flume.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.jsqlparser</groupId>
+ <artifactId>jsqlparser</artifactId>
+ <version>${jsqlparser.version}</version>
+ </dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>