This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ba0273087 [cdc] Parse primary key and schema by historyRecord (#1621)
ba0273087 is described below

commit ba0273087de40accec5ebca390f6e83bd8bbf048
Author: JunZhang <[email protected]>
AuthorDate: Mon Jul 24 11:35:08 2023 +0800

    [cdc] Parse primary key and schema by historyRecord (#1621)
---
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 37 +++++-----
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  | 79 +++++++++-------------
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 17 +++--
 3 files changed, 64 insertions(+), 69 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index f5a3d8b53..67a289880 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -35,15 +35,11 @@ import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Preconditions;
 
-import org.apache.paimon.shade.guava30.com.google.common.base.Strings;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import com.alibaba.druid.sql.SQLUtils;
-import com.alibaba.druid.sql.ast.SQLStatement;
-import 
com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
-import com.alibaba.druid.util.JdbcConstants;
+import io.debezium.relational.history.TableChanges;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,7 +74,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     private final boolean caseSensitive;
     private final TableNameConverter tableNameConverter;
     private final List<ComputedColumn> computedColumns;
-    private final NewTableSchemaBuilder<MySqlCreateTableStatement> 
schemaBuilder;
+    private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
     @Nullable private final Pattern includingPattern;
     @Nullable private final Pattern excludingPattern;
     private final Set<String> includedTables = new HashSet<>();
@@ -111,7 +107,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             ZoneId serverTimeZone,
             boolean caseSensitive,
             TableNameConverter tableNameConverter,
-            NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
+            NewTableSchemaBuilder<JsonNode> schemaBuilder,
             @Nullable Pattern includingPattern,
             @Nullable Pattern excludingPattern,
             boolean convertTinyint1ToBool) {
@@ -131,7 +127,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             TableNameConverter tableNameConverter,
-            NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
+            NewTableSchemaBuilder<JsonNode> schemaBuilder,
             @Nullable Pattern includingPattern,
             @Nullable Pattern excludingPattern,
             boolean convertTinyint1ToBool) {
@@ -231,20 +227,25 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
         try {
             String historyRecordString = historyRecord.asText();
-            String ddl = 
objectMapper.readTree(historyRecordString).get("ddl").asText();
-            if (Strings.isNullOrEmpty(ddl)) {
-                return Optional.empty();
+            JsonNode tableChanges = 
objectMapper.readTree(historyRecordString).get("tableChanges");
+            if (tableChanges.size() != 1) {
+                throw new IllegalArgumentException(
+                        "Invalid historyRecord, because tableChanges should 
contain exactly 1 item.\n"
+                                + historyRecord.asText());
             }
 
-            SQLStatement statement = SQLUtils.parseSingleStatement(ddl, 
JdbcConstants.MYSQL);
-            if (!(statement instanceof MySqlCreateTableStatement)) {
+            JsonNode tableChange = tableChanges.get(0);
+            if (!tableChange
+                    .get("type")
+                    .asText()
+                    .equals(TableChanges.TableChangeType.CREATE.name())) {
                 return Optional.empty();
             }
 
-            MySqlCreateTableStatement createTableStatement = 
(MySqlCreateTableStatement) statement;
-            List<String> primaryKeys = 
createTableStatement.getPrimaryKeyNames();
-            String tableName = createTableStatement.getTableName();
-            if (primaryKeys.isEmpty()) {
+            JsonNode primaryKeyColumnNames = 
tableChange.get("table").get("primaryKeyColumnNames");
+            if (primaryKeyColumnNames.size() == 0) {
+                String id = tableChange.get("id").asText();
+                String tableName = id.replaceAll("\"", "").split("\\.")[1];
                 LOG.debug(
                         "Didn't find primary keys from MySQL DDL for table 
'{}'. "
                                 + "This table won't be synchronized.",
@@ -254,7 +255,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                 return Optional.empty();
             }
 
-            return schemaBuilder.build(createTableStatement);
+            return schemaBuilder.build(tableChange);
         } catch (Exception e) {
             LOG.info("Failed to parse history record for schema changes", e);
             return Optional.empty();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 0d9561fef..e7176c6fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -22,16 +22,10 @@ import 
org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataType;
 
-import com.alibaba.druid.sql.ast.SQLDataType;
-import com.alibaba.druid.sql.ast.SQLExpr;
-import com.alibaba.druid.sql.ast.SQLName;
-import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
-import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
-import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
-import com.alibaba.druid.sql.ast.statement.SQLTableElement;
-import 
com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
-import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
 
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +36,7 @@ import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CO
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Schema builder for MySQL cdc. */
-public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<MySqlCreateTableStatement> {
+public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode> {
 
     private final Map<String, String> tableConfig;
     private final boolean caseSensitive;
@@ -53,50 +47,41 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<MySqlCreat
     }
 
     @Override
-    public Optional<Schema> build(MySqlCreateTableStatement statement) {
-        List<SQLTableElement> columns = statement.getTableElementList();
-        LinkedHashMap<String, Tuple2<DataType, String>> fields = new 
LinkedHashMap<>();
-
-        for (SQLTableElement element : columns) {
-            if (element instanceof SQLColumnDefinition) {
-                SQLColumnDefinition column = (SQLColumnDefinition) element;
-                SQLName name = column.getName();
-                SQLDataType dataType = column.getDataType();
-                List<SQLExpr> arguments = dataType.getArguments();
-                Integer precision = null;
-                Integer scale = null;
-                if (arguments.size() >= 1) {
-                    precision = (int) (((SQLIntegerExpr) 
arguments.get(0)).getValue());
-                }
-
-                if (arguments.size() >= 2) {
-                    scale = (int) (((SQLIntegerExpr) 
arguments.get(1)).getValue());
-                }
-
-                SQLCharExpr comment = (SQLCharExpr) column.getComment();
-                fields.put(
-                        name.getSimpleName(),
-                        Tuple2.of(
-                                MySqlTypeUtils.toDataType(
-                                        column.getDataType().getName(),
-                                        precision,
-                                        scale,
-                                        
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue()),
-                                comment == null ? null : 
String.valueOf(comment.getValue())));
-            }
+    public Optional<Schema> build(JsonNode tableChange) {
+        JsonNode jsonTable = tableChange.get("table");
+        String tableName = tableChange.get("id").asText();
+        ArrayNode columns = (ArrayNode) jsonTable.get("columns");
+        LinkedHashMap<String, DataType> fields = new LinkedHashMap<>();
+
+        for (JsonNode element : columns) {
+            Integer precision = element.has("length") ? 
element.get("length").asInt() : null;
+            Integer scale = element.has("scale") ? 
element.get("scale").asInt() : null;
+            fields.put(
+                    element.get("name").asText(),
+                    // TODO : add table comment and column comment when we 
upgrade flink cdc to 2.4
+                    MySqlTypeUtils.toDataType(
+                                    element.get("typeExpression").asText(),
+                                    precision,
+                                    scale,
+                                    
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue())
+                            .copy(element.get("optional").asBoolean()));
         }
 
-        List<String> primaryKeys = statement.getPrimaryKeyNames();
+        ArrayNode arrayNode = (ArrayNode) 
jsonTable.get("primaryKeyColumnNames");
+        List<String> primaryKeys = new ArrayList<>();
+        for (JsonNode primary : arrayNode) {
+            primaryKeys.add(primary.asText());
+        }
 
         if (!caseSensitive) {
-            LinkedHashMap<String, Tuple2<DataType, String>> tmp = new 
LinkedHashMap<>();
-            for (Map.Entry<String, Tuple2<DataType, String>> entry : 
fields.entrySet()) {
+            LinkedHashMap<String, DataType> tmp = new LinkedHashMap<>();
+            for (Map.Entry<String, DataType> entry : fields.entrySet()) {
                 String fieldName = entry.getKey();
                 checkArgument(
                         !tmp.containsKey(fieldName.toLowerCase()),
                         "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
                         fieldName,
-                        statement.getTableName());
+                        tableName);
                 tmp.put(fieldName.toLowerCase(), entry.getValue());
             }
             fields = tmp;
@@ -107,8 +92,8 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<MySqlCreat
 
         Schema.Builder builder = Schema.newBuilder();
         builder.options(tableConfig);
-        for (Map.Entry<String, Tuple2<DataType, String>> entry : 
fields.entrySet()) {
-            builder.column(entry.getKey(), entry.getValue().f0, 
entry.getValue().f1);
+        for (Map.Entry<String, DataType> entry : fields.entrySet()) {
+            builder.column(entry.getKey(), entry.getValue());
         }
         Schema schema = builder.primaryKey(primaryKeys).build();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 1606cab2a..0595d6fa0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -823,24 +823,33 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
             waitForResult(Collections.singletonList("+I[1, one]"), table1, 
rowType, primaryKeys);
 
             // create new tables at runtime
-            // synchronized table: t2
+            // synchronized table: t2, t22
             statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), 
PRIMARY KEY (k))");
             statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
-            // not synchronized tables: ta, t3
+
+            statement.executeUpdate("CREATE TABLE t22 LIKE t2");
+            statement.executeUpdate("INSERT INTO t22 VALUES (1, 'Hello')");
+
+            // not synchronized tables: ta, t3, t4
             statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10), 
PRIMARY KEY (k))");
             statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
             statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
             statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
+            statement.executeUpdate("CREATE TABLE t4 SELECT * FROM t2");
 
             statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
             waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1, 
rowType, primaryKeys);
 
             // check tables
-            assertTableExists(Arrays.asList("t1", "t2"));
-            assertTableNotExists(Arrays.asList("a", "ta", "t3"));
+            assertTableExists(Arrays.asList("t1", "t2", "t22"));
+            assertTableNotExists(Arrays.asList("a", "ta", "t3", "t4"));
 
             FileStoreTable newTable = getFileStoreTable("t2");
             waitForResult(Collections.singletonList("+I[1, Hi]"), newTable, 
rowType, primaryKeys);
+
+            newTable = getFileStoreTable("t22");
+            waitForResult(
+                    Collections.singletonList("+I[1, Hello]"), newTable, 
rowType, primaryKeys);
         }
     }
 

Reply via email to