This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new db76d2a3 [improve]The sql_parser model of schema change support
automatic table creation (#435)
db76d2a3 is described below
commit db76d2a38d73d5c3f3dc2912aed4b270467fb074
Author: wudongliang <[email protected]>
AuthorDate: Tue Jul 23 11:30:54 2024 +0800
[improve]The sql_parser model of schema change support automatic table
creation (#435)
---
.../doris/flink/catalog/doris/FieldSchema.java | 18 +++
.../doris/flink/catalog/doris/TableSchema.java | 27 ++++
.../flink/sink/schema/SQLParserSchemaManager.java | 155 +++++++++++++++++++--
.../jsondebezium/JsonDebeziumChangeUtils.java | 37 +++++
.../jsondebezium/JsonDebeziumSchemaChange.java | 8 ++
.../JsonDebeziumSchemaChangeImplV2.java | 48 +------
.../jsondebezium/SQLParserSchemaChange.java | 32 ++++-
.../sink/schema/SQLParserSchemaManagerTest.java | 138 ++++++++++++++++++
.../jsondebezium/TestJsonDebeziumChangeUtils.java | 43 ++++++
.../TestJsonDebeziumSchemaChangeImplV2.java | 13 --
.../jsondebezium/TestSQLParserSchemaChange.java | 42 +++++-
.../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 154 ++++++++++++++++++++
12 files changed, 641 insertions(+), 74 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
index 3d7f2765..a8d85e1f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
@@ -69,4 +69,22 @@ public class FieldSchema {
public void setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
}
+
+ @Override
+ public String toString() {
+ return "FieldSchema{"
+ + "name='"
+ + name
+ + '\''
+ + ", typeString='"
+ + typeString
+ + '\''
+ + ", defaultValue='"
+ + defaultValue
+ + '\''
+ + ", comment='"
+ + comment
+ + '\''
+ + '}';
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
index 4cc9098f..3a47a044 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -106,4 +106,31 @@ public class TableSchema {
public Integer getTableBuckets() {
return tableBuckets;
}
+
+ @Override
+ public String toString() {
+ return "TableSchema{"
+ + "database='"
+ + database
+ + '\''
+ + ", table='"
+ + table
+ + '\''
+ + ", tableComment='"
+ + tableComment
+ + '\''
+ + ", fields="
+ + fields
+ + ", keys="
+ + String.join(",", keys)
+ + ", model="
+ + model.name()
+ + ", distributeKeys="
+ + String.join(",", distributeKeys)
+ + ", properties="
+ + properties
+ + ", tableBuckets="
+ + tableBuckets
+ + '}';
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
index 6f157cdc..5acedfc2 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.schema;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
@@ -27,22 +28,32 @@ import net.sf.jsqlparser.statement.alter.AlterExpression;
import net.sf.jsqlparser.statement.alter.AlterExpression.ColumnDataType;
import net.sf.jsqlparser.statement.alter.AlterOperation;
import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import net.sf.jsqlparser.statement.create.table.Index;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
-/** Use {@link net.sf.jsqlparser.parser.CCJSqlParserUtil} to parse SQL
statements. */
+/** Use {@link CCJSqlParserUtil} to parse SQL statements. */
public class SQLParserSchemaManager implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(SQLParserSchemaManager.class);
private static final String DEFAULT = "DEFAULT";
private static final String COMMENT = "COMMENT";
+ private static final String PRIMARY = "PRIMARY";
+ private static final String PRIMARY_KEY = "PRIMARY KEY";
+ private static final String UNIQUE = "UNIQUE";
/**
* Doris' schema change only supports ADD, DROP, and RENAME operations.
This method is only used
@@ -96,6 +107,128 @@ public class SQLParserSchemaManager implements
Serializable {
return ddlList;
}
+ public TableSchema parseCreateTableStatement(
+ SourceConnector sourceConnector,
+ String ddl,
+ String dorisTable,
+ Map<String, String> tableProperties) {
+ try {
+ Statement statement = CCJSqlParserUtil.parse(ddl);
+ if (statement instanceof CreateTable) {
+ CreateTable createTable = (CreateTable) statement;
+ Map<String, FieldSchema> columnFields = new LinkedHashMap<>();
+ List<String> pkKeys = new ArrayList<>();
+ createTable
+ .getColumnDefinitions()
+ .forEach(
+ column -> {
+ String columnName = column.getColumnName();
+ ColDataType colDataType =
column.getColDataType();
+ String dataType =
parseDataType(colDataType, sourceConnector);
+ List<String> columnSpecs =
column.getColumnSpecs();
+ String defaultValue =
extractDefaultValue(columnSpecs);
+ String comment =
extractComment(columnSpecs);
+ FieldSchema fieldSchema =
+ new FieldSchema(
+ columnName, dataType,
defaultValue, comment);
+ columnFields.put(columnName, fieldSchema);
+ extractColumnPrimaryKey(columnName,
columnSpecs, pkKeys);
+ });
+
+ List<Index> indexes = createTable.getIndexes();
+ extractIndexesPrimaryKey(indexes, pkKeys);
+
+ String[] dbTable = dorisTable.split("\\.");
+ Preconditions.checkArgument(dbTable.length == 2);
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setDatabase(dbTable[0]);
+ tableSchema.setTable(dbTable[1]);
+ tableSchema.setModel(pkKeys.isEmpty() ? DataModel.DUPLICATE :
DataModel.UNIQUE);
+ tableSchema.setFields(columnFields);
+ tableSchema.setKeys(pkKeys);
+ tableSchema.setTableComment(
+
extractTableComment(createTable.getTableOptionsStrings()));
+ tableSchema.setDistributeKeys(
+ JsonDebeziumChangeUtils.buildDistributeKeys(pkKeys,
columnFields));
+ tableSchema.setProperties(tableProperties);
+ if (tableProperties.containsKey("table-buckets")) {
+ String tableBucketsConfig =
tableProperties.get("table-buckets");
+ Map<String, Integer> tableBuckets =
+ DatabaseSync.getTableBuckets(tableBucketsConfig);
+ Integer buckets =
+ JsonDebeziumChangeUtils.getTableSchemaBuckets(
+ tableBuckets, tableSchema.getTable());
+ tableSchema.setTableBuckets(buckets);
+ }
+ return tableSchema;
+ } else {
+ LOG.warn(
+ "Unsupported statement type. ddl={},
sourceConnector={}, dorisTable={}",
+ ddl,
+ sourceConnector.getConnectorName(),
+ dorisTable);
+ }
+ } catch (JSQLParserException e) {
+ LOG.warn(
+ "Failed to parse create table statement. ddl={},
sourceConnector={}, dorisTable={}",
+ ddl,
+ sourceConnector.getConnectorName(),
+ dorisTable);
+ }
+ return null;
+ }
+
+ private void extractIndexesPrimaryKey(List<Index> indexes, List<String>
pkKeys) {
+ if (CollectionUtils.isEmpty(indexes)) {
+ return;
+ }
+ indexes.stream()
+ .filter(
+ index ->
+ PRIMARY_KEY.equalsIgnoreCase(index.getType())
+ ||
UNIQUE.equalsIgnoreCase(index.getType()))
+ .flatMap(index -> index.getColumnsNames().stream())
+ .distinct()
+ .filter(
+ primaryKey ->
+ pkKeys.stream()
+ .noneMatch(pkKey ->
pkKey.equalsIgnoreCase(primaryKey)))
+ .forEach(pkKeys::add);
+ }
+
+ private void extractColumnPrimaryKey(
+ String columnName, List<String> columnSpecs, List<String> pkKeys) {
+ if (CollectionUtils.isEmpty(columnSpecs)) {
+ return;
+ }
+ for (String columnSpec : columnSpecs) {
+ if (PRIMARY.equalsIgnoreCase(columnSpec)) {
+ pkKeys.add(columnName);
+ }
+ }
+ }
+
+ private String extractTableComment(List<String> tableOptionsStrings) {
+ if (CollectionUtils.isEmpty(tableOptionsStrings)) {
+ return null;
+ }
+ return extractAdjacentString(tableOptionsStrings, COMMENT);
+ }
+
+ private String parseDataType(ColDataType colDataType, SourceConnector
sourceConnector) {
+ String dataType = colDataType.getDataType();
+ int length = 0;
+ int scale = 0;
+ if (CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) {
+ List<String> argumentsStringList =
colDataType.getArgumentsStringList();
+ length = Integer.parseInt(argumentsStringList.get(0));
+ if (argumentsStringList.size() == 2) {
+ scale = Integer.parseInt(argumentsStringList.get(1));
+ }
+ }
+ return JsonDebeziumChangeUtils.buildDorisTypeName(sourceConnector,
dataType, length, scale);
+ }
+
private String processDropColumnOperation(AlterExpression alterExpression,
String dorisTable) {
String dropColumnDDL =
SchemaChangeHelper.buildDropColumnDDL(dorisTable,
alterExpression.getColumnName());
@@ -110,19 +243,7 @@ public class SQLParserSchemaManager implements
Serializable {
for (ColumnDataType columnDataType : colDataTypeList) {
String columnName = columnDataType.getColumnName();
ColDataType colDataType = columnDataType.getColDataType();
- String datatype = colDataType.getDataType();
- Integer length = null;
- Integer scale = null;
- if
(CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) {
- List<String> argumentsStringList =
colDataType.getArgumentsStringList();
- length = Integer.parseInt(argumentsStringList.get(0));
- if (argumentsStringList.size() == 2) {
- scale = Integer.parseInt(argumentsStringList.get(1));
- }
- }
- datatype =
- JsonDebeziumChangeUtils.buildDorisTypeName(
- sourceConnector, datatype, length, scale);
+ String datatype = parseDataType(colDataType, sourceConnector);
List<String> columnSpecs = columnDataType.getColumnSpecs();
String defaultValue = extractDefaultValue(columnSpecs);
@@ -161,6 +282,9 @@ public class SQLParserSchemaManager implements Serializable
{
@VisibleForTesting
public String extractDefaultValue(List<String> columnSpecs) {
+ if (CollectionUtils.isEmpty(columnSpecs)) {
+ return null;
+ }
return extractAdjacentString(columnSpecs, DEFAULT);
}
@@ -185,6 +309,9 @@ public class SQLParserSchemaManager implements Serializable
{
@VisibleForTesting
public String extractComment(List<String> columnSpecs) {
+ if (CollectionUtils.isEmpty(columnSpecs)) {
+ return null;
+ }
return extractAdjacentString(columnSpecs, COMMENT);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
index 36acecd4..0f176914 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -30,7 +31,12 @@ import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Pattern;
import static org.apache.doris.flink.tools.cdc.SourceConnector.MYSQL;
import static org.apache.doris.flink.tools.cdc.SourceConnector.ORACLE;
@@ -95,4 +101,35 @@ public class JsonDebeziumChangeUtils {
}
return dorisTypeName;
}
+
+ public static List<String> buildDistributeKeys(
+ List<String> primaryKeys, Map<String, FieldSchema> fields) {
+ if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
+ return primaryKeys;
+ }
+ if (!fields.isEmpty()) {
+ Entry<String, FieldSchema> firstField =
fields.entrySet().iterator().next();
+ return Collections.singletonList(firstField.getKey());
+ }
+ return new ArrayList<>();
+ }
+
+ public static Integer getTableSchemaBuckets(
+ Map<String, Integer> tableBucketsMap, String tableName) {
+ if (tableBucketsMap != null) {
+ // Firstly, if the table name is in the table-buckets map, set the
buckets of the table.
+ if (tableBucketsMap.containsKey(tableName)) {
+ return tableBucketsMap.get(tableName);
+ }
+ // Secondly, iterate over the map to find a corresponding regular
expression match,
+ for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
+
+ Pattern pattern = Pattern.compile(entry.getKey());
+ if (pattern.matcher(tableName).matches()) {
+ return entry.getValue();
+ }
+ }
+ }
+ return null;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index a2164b72..15817da6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -66,6 +66,9 @@ public abstract class JsonDebeziumSchemaChange extends
CdcSchemaChange {
protected SchemaChangeManager schemaChangeManager;
protected JsonDebeziumChangeContext changeContext;
protected SourceConnector sourceConnector;
+ protected String targetDatabase;
+ protected String targetTablePrefix;
+ protected String targetTableSuffix;
public abstract boolean schemaChange(JsonNode recordRoot);
@@ -189,6 +192,11 @@ public abstract class JsonDebeziumSchemaChange extends
CdcSchemaChange {
}
}
+ protected String getCreateTableIdentifier(JsonNode record) {
+ String table = extractJsonNode(record.get("source"), "table");
+ return targetDatabase + "." + targetTablePrefix + table +
targetTableSuffix;
+ }
+
public Map<String, String> getTableMapping() {
return tableMapping;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 7ef975e2..f44602c9 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -19,7 +19,6 @@ package
org.apache.doris.flink.sink.writer.serializer.jsondebezium;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -45,13 +44,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
@@ -73,9 +70,6 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new
LinkedHashMap<>();
// create table properties
private final Map<String, String> tableProperties;
- private final String targetDatabase;
- private final String targetTablePrefix;
- private final String targetTableSuffix;
private final Set<String> filledTables = new HashSet<>();
public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext
changeContext) {
@@ -240,7 +234,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
TableSchema tableSchema = new TableSchema();
tableSchema.setFields(field);
tableSchema.setKeys(pkList);
- tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
+
tableSchema.setDistributeKeys(JsonDebeziumChangeUtils.buildDistributeKeys(pkList,
field));
tableSchema.setTableComment(tblComment);
tableSchema.setProperties(tableProperties);
tableSchema.setModel(pkList.isEmpty() ? DataModel.DUPLICATE :
DataModel.UNIQUE);
@@ -252,48 +246,14 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
if (tableProperties.containsKey("table-buckets")) {
String tableBucketsConfig = tableProperties.get("table-buckets");
Map<String, Integer> tableBuckets =
DatabaseSync.getTableBuckets(tableBucketsConfig);
- Integer buckets = getTableSchemaBuckets(tableBuckets,
tableSchema.getTable());
+ Integer buckets =
+ JsonDebeziumChangeUtils.getTableSchemaBuckets(
+ tableBuckets, tableSchema.getTable());
tableSchema.setTableBuckets(buckets);
}
return tableSchema;
}
- @VisibleForTesting
- public Integer getTableSchemaBuckets(Map<String, Integer> tableBucketsMap,
String tableName) {
- if (tableBucketsMap != null) {
- // Firstly, if the table name is in the table-buckets map, set the
buckets of the table.
- if (tableBucketsMap.containsKey(tableName)) {
- return tableBucketsMap.get(tableName);
- }
- // Secondly, iterate over the map to find a corresponding regular
expression match,
- for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
-
- Pattern pattern = Pattern.compile(entry.getKey());
- if (pattern.matcher(tableName).matches()) {
- return entry.getValue();
- }
- }
- }
- return null;
- }
-
- private List<String> buildDistributeKeys(
- List<String> primaryKeys, Map<String, FieldSchema> fields) {
- if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
- return primaryKeys;
- }
- if (!fields.isEmpty()) {
- Entry<String, FieldSchema> firstField =
fields.entrySet().iterator().next();
- return Collections.singletonList(firstField.getKey());
- }
- return new ArrayList<>();
- }
-
- private String getCreateTableIdentifier(JsonNode record) {
- String table = extractJsonNode(record.get("source"), "table");
- return targetDatabase + "." + targetTablePrefix + table +
targetTableSuffix;
- }
-
private boolean checkSchemaChange(String database, String table, DDLSchema
ddlSchema)
throws IOException, IllegalArgumentException {
Map<String, Object> param =
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index 6be3f72c..7e44acc6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.sink.schema.SQLParserSchemaManager;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.EventType;
@@ -42,6 +43,15 @@ public class SQLParserSchemaChange extends
JsonDebeziumSchemaChange {
this.sqlParserSchemaManager = new SQLParserSchemaManager();
this.tableMapping = changeContext.getTableMapping();
this.objectMapper = changeContext.getObjectMapper();
+ this.targetDatabase = changeContext.getTargetDatabase();
+ this.targetTablePrefix =
+ changeContext.getTargetTablePrefix() == null
+ ? ""
+ : changeContext.getTargetTablePrefix();
+ this.targetTableSuffix =
+ changeContext.getTargetTableSuffix() == null
+ ? ""
+ : changeContext.getTargetTableSuffix();
}
@Override
@@ -64,8 +74,16 @@ public class SQLParserSchemaChange extends
JsonDebeziumSchemaChange {
}
if (eventType.equals(EventType.CREATE)) {
- // TODO support auto create table
- LOG.warn("Not auto support create table. recordRoot={}",
recordRoot);
+ String dorisTable = getCreateTableIdentifier(recordRoot);
+ TableSchema tableSchema =
tryParseCreateTableStatement(recordRoot, dorisTable);
+ status = schemaChangeManager.createTable(tableSchema);
+ if (status) {
+ String cdcTbl = getCdcTableIdentifier(recordRoot);
+ String dorisTbl = getCreateTableIdentifier(recordRoot);
+ changeContext.getTableMapping().put(cdcTbl, dorisTbl);
+ this.tableMapping = changeContext.getTableMapping();
+ LOG.info("create table ddl status: {}", status);
+ }
} else if (eventType.equals(EventType.ALTER)) {
Tuple2<String, String> dorisTableTuple =
getDorisTableTuple(recordRoot);
if (dorisTableTuple == null) {
@@ -81,6 +99,16 @@ public class SQLParserSchemaChange extends
JsonDebeziumSchemaChange {
return status;
}
+ @VisibleForTesting
+ public TableSchema tryParseCreateTableStatement(JsonNode record, String
dorisTable)
+ throws IOException {
+ JsonNode historyRecord = extractHistoryRecord(record);
+ String ddl = extractJsonNode(historyRecord, "ddl");
+ extractSourceConnector(record);
+ return sqlParserSchemaManager.parseCreateTableStatement(
+ sourceConnector, ddl, dorisTable,
changeContext.getTableProperties());
+ }
+
@VisibleForTesting
public List<String> tryParserAlterDDLs(JsonNode record) throws IOException
{
String dorisTable =
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
index cbe3f08a..941bca46 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.sink.schema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.junit.Assert;
import org.junit.Before;
@@ -25,6 +26,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
public class SQLParserSchemaManagerTest {
@@ -203,4 +205,140 @@ public class SQLParserSchemaManagerTest {
Assert.assertEquals("a\"bc\"d",
schemaManager.removeContinuousChar("\"a\"bc\"d\"", '\"'));
Assert.assertEquals("abc", schemaManager.removeContinuousChar("'abc'",
'\''));
}
+
+ @Test
+ public void testParseCreateTableStatement() {
+ String dorisTable = "doris.auto_tab";
+ String ddl =
+ "CREATE TABLE `test_sinka` (\n"
+ + " `id` int NOT NULL DEFAULT '10000' COMMENT
'id_test',\n"
+ + " `create_time` datetime(3) DEFAULT
CURRENT_TIMESTAMP(3),\n"
+ + " `c1` int DEFAULT '999',\n"
+ + " `decimal_type` decimal(9,3) DEFAULT '1.000'
COMMENT 'decimal_tes',\n"
+ + " `aaa` varchar(100) DEFAULT NULL,\n"
+ + " `decimal_type3` decimal(38,9) DEFAULT
'1.123456789' COMMENT 'comment_test',\n"
+ + " `create_time3` datetime(3) DEFAULT
CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n"
+ + " PRIMARY KEY (`id`)\n"
+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci";
+ TableSchema tableSchema =
+ schemaManager.parseCreateTableStatement(
+ SourceConnector.MYSQL, ddl, dorisTable, new
HashMap<>());
+
+ String expected =
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT',
defaultValue='10000', comment='id_test'},
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)',
defaultValue='CURRENT_TIMESTAMP', comment='null'},
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999',
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`',
typeString='DECIMALV3(9,3)', defaultValue='1.0 [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
+
+ @Test
+ public void testParseCreateUniqueTableStatement() {
+ String dorisTable = "doris.auto_uni_tab";
+ String ddl =
+ "CREATE TABLE test_sink_unique ( id INT NOT NULL, name
VARCHAR(100) NOT NULL, age INT, email VARCHAR(100), UNIQUE (email)
)";
+ TableSchema tableSchema =
+ schemaManager.parseCreateTableStatement(
+ SourceConnector.MYSQL, ddl, dorisTable, new
HashMap<>());
+
+ String expected =
+ "TableSchema{database='doris', table='auto_uni_tab',
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT',
defaultValue='null', comment='null'}, name=FieldSchema{name='name',
typeString='VARCHAR(300)', defaultValue='null', comment='null'},
age=FieldSchema{name='age', typeString='INT', defaultValue='null',
comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)',
defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distribut [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
+
+ @Test
+ public void testParseCreateDuplicateTableStatement() {
+ String dorisTable = "doris.auto_duptab";
+ String ddl =
+ "CREATE TABLE test_sink_duplicate (\n"
+ + " id INT,\n"
+ + " name VARCHAR(50),\n"
+ + " age INT,\n"
+ + " address VARCHAR(255)\n"
+ + ")";
+ TableSchema tableSchema =
+ schemaManager.parseCreateTableStatement(
+ SourceConnector.MYSQL, ddl, dorisTable, new
HashMap<>());
+
+ String expected =
+ "TableSchema{database='doris', table='auto_duptab',
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT',
defaultValue='null', comment='null'}, name=FieldSchema{name='name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
age=FieldSchema{name='age', typeString='INT', defaultValue='null',
comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)',
defaultValue='null', comment='null'}}, keys=, model=DUPLICATE, distribu [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
+
+ @Test
+ public void testParseOracleTableStatement() {
+ String dorisTable = "doris.auto_tab";
+ String ddl =
+ "CREATE TABLE employees (\n"
+ + " employee_id NUMBER(10) NOT NULL,\n"
+ + " first_name VARCHAR2(50),\n"
+ + " last_name VARCHAR2(50) NOT NULL,\n"
+ + " email VARCHAR2(100) UNIQUE,\n"
+ + " phone_number VARCHAR2(20),\n"
+ + " hire_date DATE DEFAULT SYSDATE NOT NULL,\n"
+ + " job_id VARCHAR2(10) NOT NULL,\n"
+ + " salary NUMBER(8, 2),\n"
+ + " commission_pct NUMBER(2, 2),\n"
+ + " manager_id NUMBER(10),\n"
+ + " department_id NUMBER(10),\n"
+ + " CONSTRAINT pk_employee PRIMARY KEY
(employee_id),\n"
+ + " CONSTRAINT fk_department FOREIGN KEY
(department_id)\n"
+ + " REFERENCES departments(department_id)\n"
+ + ");";
+ TableSchema tableSchema =
+ schemaManager.parseCreateTableStatement(
+ SourceConnector.ORACLE, ddl, dorisTable, new
HashMap<>());
+
+ String expected =
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={employee_id=FieldSchema{name='employee_id',
typeString='BIGINT', defaultValue='null', comment='null'},
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)',
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null',
com [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
+
+ @Test
+ public void testParseOraclePrimaryTableStatement() {
+ String dorisTable = "doris.auto_tab";
+ String ddl =
+ "CREATE TABLE employees (\n"
+ + " employee_id NUMBER(10) PRIMARY KEY,\n"
+ + " first_name VARCHAR2(50),\n"
+ + " last_name VARCHAR2(50) NOT NULL,\n"
+ + " email VARCHAR2(100),\n"
+ + " phone_number VARCHAR2(20),\n"
+ + " hire_date DATE DEFAULT SYSDATE NOT NULL,\n"
+ + " job_id VARCHAR2(10) NOT NULL,\n"
+ + " salary NUMBER(8, 2),\n"
+ + " commission_pct NUMBER(2, 2),\n"
+ + " manager_id NUMBER(10),\n"
+ + " department_id NUMBER(10)\n"
+ + ");";
+ TableSchema tableSchema =
+ schemaManager.parseCreateTableStatement(
+ SourceConnector.ORACLE, ddl, dorisTable, new
HashMap<>());
+
+ String expected =
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={employee_id=FieldSchema{name='employee_id',
typeString='BIGINT', defaultValue='null', comment='null'},
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)',
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null',
com [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
+
+ @Test
+ public void testParseOracleDuplicateTableStatement() {
+ String dorisTable = "doris.auto_tab";
+ String ddl =
+ "CREATE TABLE orders (\n"
+ + " order_id NUMBER(10) NOT NULL,\n"
+ + " customer_id NUMBER(10) NOT NULL,\n"
+ + " order_date DATE DEFAULT SYSDATE NOT NULL,\n"
+ + " status VARCHAR2(20) CHECK (status IN
('PENDING', 'SHIPPED', 'DELIVERED', 'CANCELLED')),\n"
+ + " total_amount NUMBER(12, 2) NOT NULL,\n"
+ + " shipping_address VARCHAR2(255),\n"
+ + " delivery_date DATE,\n"
+ + " CONSTRAINT fk_customer FOREIGN KEY
(customer_id)\n"
+ + " REFERENCES customers(customer_id),\n"
+ + " CONSTRAINT chk_total_amount CHECK (total_amount
>= 0)\n"
+ + ");";
+ TableSchema tableSchema =
+ schemaManager.parseCreateTableStatement(
+ SourceConnector.ORACLE, ddl, dorisTable, new
HashMap<>());
+
+ String expected =
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={order_id=FieldSchema{name='order_id',
typeString='BIGINT', defaultValue='null', comment='null'},
customer_id=FieldSchema{name='customer_id', typeString='BIGINT',
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date',
typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'},
status=FieldSchema{name='status', typeString='VARCHAR(60)',
defaultValue='null', comment=' [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java
new file mode 100644
index 00000000..c2a93893
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestJsonDebeziumChangeUtils {
+
+ @Test
+ public void testGetTableSchemaBuckets() {
+ Assert.assertNull(JsonDebeziumChangeUtils.getTableSchemaBuckets(null,
null));
+ Map<String, Integer> map = new HashMap<>();
+ Assert.assertNull(JsonDebeziumChangeUtils.getTableSchemaBuckets(map,
null));
+ map.put("tbl1", 1);
+ Assert.assertEquals(
+ JsonDebeziumChangeUtils.getTableSchemaBuckets(map,
"tbl1").intValue(), 1);
+ map = new HashMap<>();
+ map.put("tbl2.*", 1);
+ Assert.assertEquals(
+ JsonDebeziumChangeUtils.getTableSchemaBuckets(map,
"tbl2").intValue(), 1);
+ Assert.assertEquals(
+ JsonDebeziumChangeUtils.getTableSchemaBuckets(map,
"tbl21").intValue(), 1);
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 041ebe51..a3c24d15 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -541,19 +541,6 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
return filedSchemaMap;
}
- @Test
- public void testGetTableSchemaBuckets() {
- Assert.assertNull(schemaChange.getTableSchemaBuckets(null, null));
- Map<String, Integer> map = new HashMap<>();
- Assert.assertNull(schemaChange.getTableSchemaBuckets(map, null));
- map.put("tbl1", 1);
- Assert.assertEquals(schemaChange.getTableSchemaBuckets(map,
"tbl1").intValue(), 1);
- map = new HashMap<>();
- map.put("tbl2.*", 1);
- Assert.assertEquals(schemaChange.getTableSchemaBuckets(map,
"tbl2").intValue(), 1);
- Assert.assertEquals(schemaChange.getTableSchemaBuckets(map,
"tbl21").intValue(), 1);
- }
-
@After
public void after() {
mockRestService.close();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
index d31ab04a..7fdf97d2 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -18,12 +18,14 @@
package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.catalog.doris.TableSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase {
@@ -39,7 +41,7 @@ public class TestSQLParserSchemaChange extends
TestJsonDebeziumChangeBase {
tableMapping,
null,
null,
- null,
+ new HashMap<>(),
objectMapper,
null,
lineDelimiter,
@@ -138,4 +140,42 @@ public class TestSQLParserSchemaChange extends
TestJsonDebeziumChangeBase {
"ALTER TABLE `test`.`t1` RENAME COLUMN `create_time2`
`create_time`",
changeNameList.get(0));
}
+
+ @Test
+ public void testAutoCreateTable() throws IOException {
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1721356080787,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sinka\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":16947,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":16947,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
[...]
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ TableSchema tableSchema =
+ schemaChange.tryParseCreateTableStatement(recordJsonNode,
"doris.auto_tab");
+
+ String expected =
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT',
defaultValue='10000', comment='id_test'},
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)',
defaultValue='CURRENT_TIMESTAMP', comment='null'},
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999',
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`',
typeString='DECIMALV3(9,3)', defaultValue='1.0 [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
+
+ @Test
+ public void testAutoCreateUniqueTable() throws IOException {
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1721370593734,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink_unique\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":24279,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":24279,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":
[...]
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ TableSchema tableSchema =
+ schemaChange.tryParseCreateTableStatement(recordJsonNode,
"doris.auto_unique_tab");
+ String expected =
+ "TableSchema{database='doris', table='auto_unique_tab',
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT',
defaultValue='null', comment='null'}, name=FieldSchema{name='name',
typeString='VARCHAR(300)', defaultValue='null', comment='null'},
age=FieldSchema{name='age', typeString='INT', defaultValue='null',
comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)',
defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distri [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
+
+ @Test
+ public void testAutoCreateDuplicateTable() throws IOException {
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1721370811092,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink_duplicate\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":24588,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":24588,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\
[...]
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ TableSchema tableSchema =
+ schemaChange.tryParseCreateTableStatement(
+ recordJsonNode, "doris.auto_duplicate_tab");
+ String expected =
+ "TableSchema{database='doris', table='auto_duplicate_tab',
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT',
defaultValue='null', comment='null'}, name=FieldSchema{name='name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
age=FieldSchema{name='age', typeString='INT', defaultValue='null',
comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)',
defaultValue='null', comment='null'}}, keys=, model=DUPLICATE, d [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index ff5ce03e..ab1dfe77 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.doris.flink.DorisTestBase;
+import org.apache.doris.flink.sink.schema.SchemaChangeMode;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -61,6 +62,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
private static final String TABLE_2 = "tbl2";
private static final String TABLE_3 = "tbl3";
private static final String TABLE_4 = "tbl4";
+ private static final String TABLE_SQL_PARSE = "tbl_sql_parse";
private static final MySQLContainer MYSQL_CONTAINER =
new MySQLContainer("mysql:8.0")
@@ -240,6 +242,158 @@ public class MySQLDorisE2ECase extends DorisTestBase {
jobClient.cancel().get();
}
+ @Test
+ public void testMySQL2DorisSQLParse() throws Exception {
+ printClusterStatus();
+ initializeMySQLTable();
+ initializeDorisTable();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ Map<String, String> flinkMap = new HashMap<>();
+ flinkMap.put("execution.checkpointing.interval", "10s");
+ flinkMap.put("pipeline.operator-chaining", "false");
+ flinkMap.put("parallelism.default", "1");
+
+ Configuration configuration = Configuration.fromMap(flinkMap);
+ env.configure(configuration);
+
+ String database = DATABASE;
+ Map<String, String> mysqlConfig = new HashMap<>();
+ mysqlConfig.put("database-name", DATABASE);
+ mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
+ mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
+ mysqlConfig.put("username", MYSQL_USER);
+ mysqlConfig.put("password", MYSQL_PASSWD);
+ mysqlConfig.put("server-time-zone", "Asia/Shanghai");
+ Configuration config = Configuration.fromMap(mysqlConfig);
+
+ Map<String, String> sinkConfig = new HashMap<>();
+ sinkConfig.put("fenodes", getFenodes());
+ sinkConfig.put("username", USERNAME);
+ sinkConfig.put("password", PASSWORD);
+ sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL,
DORIS_CONTAINER.getHost()));
+ sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ sinkConfig.put("sink.check-interval", "5000");
+ Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put("replication_num", "1");
+
+ String includingTables = "tbl.*";
+ String excludingTables = "";
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ databaseSync
+ .setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setIgnoreDefaultValue(false)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(true)
+ .setSchemaChangeMode(SchemaChangeMode.SQL_PARSER.getName())
+ // no single sink
+ .setSingleSink(true)
+ .create();
+ databaseSync.build();
+ JobClient jobClient = env.executeAsync();
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(RUNNING),
+ Deadline.fromNow(Duration.ofSeconds(10)));
+
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2",
"doris_3,3");
+ String sql =
+ "select * from ( select * from %s.%s union all select * from
%s.%s union all select * from %s.%s ) res order by 1";
+ String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE,
TABLE_2, DATABASE, TABLE_3);
+ checkResult(expected, query1, 2);
+
+ // add incremental data
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER,
MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format("insert into %s.%s values
('doris_1_1',10)", DATABASE, TABLE_1));
+ statement.execute(
+ String.format("insert into %s.%s values
('doris_2_1',11)", DATABASE, TABLE_2));
+ statement.execute(
+ String.format("insert into %s.%s values
('doris_3_1',12)", DATABASE, TABLE_3));
+
+ statement.execute(
+ String.format(
+ "update %s.%s set age=18 where name='doris_1'",
DATABASE, TABLE_1));
+ statement.execute(
+ String.format("delete from %s.%s where name='doris_2'",
DATABASE, TABLE_2));
+ }
+
+ Thread.sleep(20000);
+ List<String> expected2 =
+ Arrays.asList(
+ "doris_1,18", "doris_1_1,10", "doris_2_1,11",
"doris_3,3", "doris_3_1,12");
+ sql =
+ "select * from ( select * from %s.%s union all select * from
%s.%s union all select * from %s.%s ) res order by 1";
+ String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE,
TABLE_2, DATABASE, TABLE_3);
+ checkResult(expected2, query2, 2);
+
+ // mock schema change
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER,
MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "alter table %s.%s add column c1 varchar(128)",
DATABASE, TABLE_1));
+ statement.execute(
+ String.format("alter table %s.%s drop column age",
DATABASE, TABLE_1));
+ Thread.sleep(20000);
+ statement.execute(
+ String.format(
+ "insert into %s.%s values
('doris_1_1_1','c1_val')",
+ DATABASE, TABLE_1));
+ }
+ Thread.sleep(20000);
+ List<String> expected3 =
+ Arrays.asList("doris_1,null", "doris_1_1,null",
"doris_1_1_1,c1_val");
+ sql = "select * from %s.%s order by 1";
+ String query3 = String.format(sql, DATABASE, TABLE_1);
+ checkResult(expected3, query3, 2);
+
+ // mock create table
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER,
MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256) primary key,\n"
+ + "`age` int\n"
+ + ")",
+ DATABASE, TABLE_SQL_PARSE));
+ statement.execute(
+ String.format(
+ "insert into %s.%s values ('doris_1',1)",
DATABASE, TABLE_SQL_PARSE));
+ statement.execute(
+ String.format(
+ "insert into %s.%s values ('doris_2',2)",
DATABASE, TABLE_SQL_PARSE));
+ statement.execute(
+ String.format(
+ "insert into %s.%s values ('doris_3',3)",
DATABASE, TABLE_SQL_PARSE));
+ }
+ Thread.sleep(20000);
+ List<String> expected4 = Arrays.asList("doris_1,1", "doris_2,2",
"doris_3,3");
+ sql = "select * from %s.%s order by 1";
+ String query4 = String.format(sql, DATABASE, TABLE_SQL_PARSE);
+ checkResult(expected4, query4, 2);
+
+ jobClient.cancel().get();
+ }
+
@Test
public void testMySQL2DorisByDefault() throws Exception {
printClusterStatus();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]