This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ba0273087 [cdc] Parse primary key and schema by historyRecord (#1621)
ba0273087 is described below
commit ba0273087de40accec5ebca390f6e83bd8bbf048
Author: JunZhang <[email protected]>
AuthorDate: Mon Jul 24 11:35:08 2023 +0800
[cdc] Parse primary key and schema by historyRecord (#1621)
---
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 37 +++++-----
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 79 +++++++++-------------
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 17 +++--
3 files changed, 64 insertions(+), 69 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index f5a3d8b53..67a289880 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -35,15 +35,11 @@ import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.shade.guava30.com.google.common.base.Strings;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import com.alibaba.druid.sql.SQLUtils;
-import com.alibaba.druid.sql.ast.SQLStatement;
-import
com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
-import com.alibaba.druid.util.JdbcConstants;
+import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,7 +74,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
private final boolean caseSensitive;
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
- private final NewTableSchemaBuilder<MySqlCreateTableStatement>
schemaBuilder;
+ private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
private final Set<String> includedTables = new HashSet<>();
@@ -111,7 +107,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
ZoneId serverTimeZone,
boolean caseSensitive,
TableNameConverter tableNameConverter,
- NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
+ NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
@@ -131,7 +127,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TableNameConverter tableNameConverter,
- NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
+ NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
@@ -231,20 +227,25 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
try {
String historyRecordString = historyRecord.asText();
- String ddl =
objectMapper.readTree(historyRecordString).get("ddl").asText();
- if (Strings.isNullOrEmpty(ddl)) {
- return Optional.empty();
+ JsonNode tableChanges =
objectMapper.readTree(historyRecordString).get("tableChanges");
+ if (tableChanges.size() != 1) {
+ throw new IllegalArgumentException(
+ "Invalid historyRecord, because tableChanges should
contain exactly 1 item.\n"
+ + historyRecord.asText());
}
- SQLStatement statement = SQLUtils.parseSingleStatement(ddl,
JdbcConstants.MYSQL);
- if (!(statement instanceof MySqlCreateTableStatement)) {
+ JsonNode tableChange = tableChanges.get(0);
+ if (!tableChange
+ .get("type")
+ .asText()
+ .equals(TableChanges.TableChangeType.CREATE.name())) {
return Optional.empty();
}
- MySqlCreateTableStatement createTableStatement =
(MySqlCreateTableStatement) statement;
- List<String> primaryKeys =
createTableStatement.getPrimaryKeyNames();
- String tableName = createTableStatement.getTableName();
- if (primaryKeys.isEmpty()) {
+ JsonNode primaryKeyColumnNames =
tableChange.get("table").get("primaryKeyColumnNames");
+ if (primaryKeyColumnNames.size() == 0) {
+ String id = tableChange.get("id").asText();
+ String tableName = id.replaceAll("\"", "").split("\\.")[1];
LOG.debug(
"Didn't find primary keys from MySQL DDL for table
'{}'. "
+ "This table won't be synchronized.",
@@ -254,7 +255,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
return Optional.empty();
}
- return schemaBuilder.build(createTableStatement);
+ return schemaBuilder.build(tableChange);
} catch (Exception e) {
LOG.info("Failed to parse history record for schema changes", e);
return Optional.empty();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 0d9561fef..e7176c6fa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -22,16 +22,10 @@ import
org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
-import com.alibaba.druid.sql.ast.SQLDataType;
-import com.alibaba.druid.sql.ast.SQLExpr;
-import com.alibaba.druid.sql.ast.SQLName;
-import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
-import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
-import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
-import com.alibaba.druid.sql.ast.statement.SQLTableElement;
-import
com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
-import org.apache.flink.api.java.tuple.Tuple2;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -42,7 +36,7 @@ import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CO
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Schema builder for MySQL cdc. */
-public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<MySqlCreateTableStatement> {
+public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode> {
private final Map<String, String> tableConfig;
private final boolean caseSensitive;
@@ -53,50 +47,41 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<MySqlCreat
}
@Override
- public Optional<Schema> build(MySqlCreateTableStatement statement) {
- List<SQLTableElement> columns = statement.getTableElementList();
- LinkedHashMap<String, Tuple2<DataType, String>> fields = new
LinkedHashMap<>();
-
- for (SQLTableElement element : columns) {
- if (element instanceof SQLColumnDefinition) {
- SQLColumnDefinition column = (SQLColumnDefinition) element;
- SQLName name = column.getName();
- SQLDataType dataType = column.getDataType();
- List<SQLExpr> arguments = dataType.getArguments();
- Integer precision = null;
- Integer scale = null;
- if (arguments.size() >= 1) {
- precision = (int) (((SQLIntegerExpr)
arguments.get(0)).getValue());
- }
-
- if (arguments.size() >= 2) {
- scale = (int) (((SQLIntegerExpr)
arguments.get(1)).getValue());
- }
-
- SQLCharExpr comment = (SQLCharExpr) column.getComment();
- fields.put(
- name.getSimpleName(),
- Tuple2.of(
- MySqlTypeUtils.toDataType(
- column.getDataType().getName(),
- precision,
- scale,
-
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue()),
- comment == null ? null :
String.valueOf(comment.getValue())));
- }
+ public Optional<Schema> build(JsonNode tableChange) {
+ JsonNode jsonTable = tableChange.get("table");
+ String tableName = tableChange.get("id").asText();
+ ArrayNode columns = (ArrayNode) jsonTable.get("columns");
+ LinkedHashMap<String, DataType> fields = new LinkedHashMap<>();
+
+ for (JsonNode element : columns) {
+ Integer precision = element.has("length") ?
element.get("length").asInt() : null;
+ Integer scale = element.has("scale") ?
element.get("scale").asInt() : null;
+ fields.put(
+ element.get("name").asText(),
+ // TODO : add table comment and column comment when we
upgrade flink cdc to 2.4
+ MySqlTypeUtils.toDataType(
+ element.get("typeExpression").asText(),
+ precision,
+ scale,
+
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue())
+ .copy(element.get("optional").asBoolean()));
}
- List<String> primaryKeys = statement.getPrimaryKeyNames();
+ ArrayNode arrayNode = (ArrayNode)
jsonTable.get("primaryKeyColumnNames");
+ List<String> primaryKeys = new ArrayList<>();
+ for (JsonNode primary : arrayNode) {
+ primaryKeys.add(primary.asText());
+ }
if (!caseSensitive) {
- LinkedHashMap<String, Tuple2<DataType, String>> tmp = new
LinkedHashMap<>();
- for (Map.Entry<String, Tuple2<DataType, String>> entry :
fields.entrySet()) {
+ LinkedHashMap<String, DataType> tmp = new LinkedHashMap<>();
+ for (Map.Entry<String, DataType> entry : fields.entrySet()) {
String fieldName = entry.getKey();
checkArgument(
!tmp.containsKey(fieldName.toLowerCase()),
"Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
fieldName,
- statement.getTableName());
+ tableName);
tmp.put(fieldName.toLowerCase(), entry.getValue());
}
fields = tmp;
@@ -107,8 +92,8 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<MySqlCreat
Schema.Builder builder = Schema.newBuilder();
builder.options(tableConfig);
- for (Map.Entry<String, Tuple2<DataType, String>> entry :
fields.entrySet()) {
- builder.column(entry.getKey(), entry.getValue().f0,
entry.getValue().f1);
+ for (Map.Entry<String, DataType> entry : fields.entrySet()) {
+ builder.column(entry.getKey(), entry.getValue());
}
Schema schema = builder.primaryKey(primaryKeys).build();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 1606cab2a..0595d6fa0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -823,24 +823,33 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
waitForResult(Collections.singletonList("+I[1, one]"), table1,
rowType, primaryKeys);
// create new tables at runtime
- // synchronized table: t2
+ // synchronized table: t2, t22
statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10),
PRIMARY KEY (k))");
statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
- // not synchronized tables: ta, t3
+
+ statement.executeUpdate("CREATE TABLE t22 LIKE t2");
+ statement.executeUpdate("INSERT INTO t22 VALUES (1, 'Hello')");
+
+ // not synchronized tables: ta, t3, t4
statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10),
PRIMARY KEY (k))");
statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
+ statement.executeUpdate("CREATE TABLE t4 SELECT * FROM t2");
statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1,
rowType, primaryKeys);
// check tables
- assertTableExists(Arrays.asList("t1", "t2"));
- assertTableNotExists(Arrays.asList("a", "ta", "t3"));
+ assertTableExists(Arrays.asList("t1", "t2", "t22"));
+ assertTableNotExists(Arrays.asList("a", "ta", "t3", "t4"));
FileStoreTable newTable = getFileStoreTable("t2");
waitForResult(Collections.singletonList("+I[1, Hi]"), newTable,
rowType, primaryKeys);
+
+ newTable = getFileStoreTable("t22");
+ waitForResult(
+ Collections.singletonList("+I[1, Hello]"), newTable,
rowType, primaryKeys);
}
}