This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 13f1fcde [Improve]Schema change parses ddl sql using jsqlparser
framework (#422)
13f1fcde is described below
commit 13f1fcdeea2b7cf89e500539b8d888accf24c909
Author: wudongliang <[email protected]>
AuthorDate: Wed Jul 17 15:10:15 2024 +0800
[Improve]Schema change parses ddl sql using jsqlparser framework (#422)
---
flink-doris-connector/pom.xml | 6 +
.../doris/flink/catalog/doris/DorisSystem.java | 3 +
.../flink/sink/schema/SQLParserSchemaManager.java | 218 +++++++++++++++++++++
.../flink/sink/schema/SchemaChangeHelper.java | 5 +-
.../doris/flink/sink/schema/SchemaChangeMode.java | 33 ++++
.../serializer/JsonDebeziumSchemaSerializer.java | 43 +++-
.../jsondebezium/JsonDebeziumChangeContext.java | 4 +-
.../jsondebezium/JsonDebeziumChangeUtils.java | 33 ++++
.../jsondebezium/JsonDebeziumSchemaChange.java | 69 +++++++
.../jsondebezium/JsonDebeziumSchemaChangeImpl.java | 11 +-
.../JsonDebeziumSchemaChangeImplV2.java | 102 +++-------
.../jsondebezium/SQLParserSchemaChange.java | 93 +++++++++
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 2 +
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 7 +
.../sink/schema/SQLParserSchemaManagerTest.java | 206 +++++++++++++++++++
.../jsondebezium/TestSQLParserSchemaChange.java | 141 +++++++++++++
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 5 +-
.../tools/cdc/CdcOraclelSyncDatabaseCase.java | 2 +-
.../tools/cdc/CdcPostgresSyncDatabaseCase.java | 2 +-
.../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 2 +-
20 files changed, 895 insertions(+), 92 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index b5080fb0..052180c4 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -92,6 +92,7 @@ under the License.
<testcontainers.version>1.17.6</testcontainers.version>
<junit.version>4.12</junit.version>
<hamcrest.version>1.3</hamcrest.version>
+ <jsqlparser.version>4.9</jsqlparser.version>
</properties>
<dependencies>
@@ -354,6 +355,11 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.jsqlparser</groupId>
+ <artifactId>jsqlparser</artifactId>
+ <version>${jsqlparser.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index ab26e308..0d33eb9f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -285,6 +285,9 @@ public class DorisSystem implements Serializable {
}
public static String identifier(String name) {
+ if (name.startsWith("`") && name.endsWith("`")) {
+ return name;
+ }
return "`" + name + "`";
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
new file mode 100644
index 00000000..6f157cdc
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import net.sf.jsqlparser.statement.alter.AlterExpression.ColumnDataType;
+import net.sf.jsqlparser.statement.alter.AlterOperation;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Use {@link net.sf.jsqlparser.parser.CCJSqlParserUtil} to parse SQL
statements. */
+public class SQLParserSchemaManager implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(SQLParserSchemaManager.class);
+ private static final String DEFAULT = "DEFAULT";
+ private static final String COMMENT = "COMMENT";
+
+ /**
+ * Doris' schema change only supports ADD, DROP, and RENAME operations.
This method is only used
+ * to parse the above schema change operations.
+ */
+ public List<String> parserAlterDDLs(
+ SourceConnector sourceConnector, String ddl, String dorisTable) {
+ List<String> ddlList = new ArrayList<>();
+ try {
+ Statement statement = CCJSqlParserUtil.parse(ddl);
+ if (statement instanceof Alter) {
+ Alter alterStatement = (Alter) statement;
+ List<AlterExpression> alterExpressions =
alterStatement.getAlterExpressions();
+ for (AlterExpression alterExpression : alterExpressions) {
+ AlterOperation operation = alterExpression.getOperation();
+ switch (operation) {
+ case DROP:
+ String dropColumnDDL =
+
processDropColumnOperation(alterExpression, dorisTable);
+ ddlList.add(dropColumnDDL);
+ break;
+ case ADD:
+ List<String> addColumnDDL =
+ processAddColumnOperation(
+ sourceConnector, alterExpression,
dorisTable);
+ ddlList.addAll(addColumnDDL);
+ break;
+ case CHANGE:
+ String changeColumnDDL =
+
processChangeColumnOperation(alterExpression, dorisTable);
+ ddlList.add(changeColumnDDL);
+ break;
+ case RENAME:
+ String renameColumnDDL =
+
processRenameColumnOperation(alterExpression, dorisTable);
+ ddlList.add(renameColumnDDL);
+ break;
+ default:
+ LOG.warn(
+ "Unsupported alter ddl operations,
operation={}, ddl={}",
+ operation.name(),
+ ddl);
+ }
+ }
+ } else {
+ LOG.warn("Unsupported ddl operations, ddl={}", ddl);
+ }
+ } catch (JSQLParserException e) {
+ LOG.warn("Failed to parse DDL SQL, SQL={}", ddl, e);
+ }
+ return ddlList;
+ }
+
+ private String processDropColumnOperation(AlterExpression alterExpression,
String dorisTable) {
+ String dropColumnDDL =
+ SchemaChangeHelper.buildDropColumnDDL(dorisTable,
alterExpression.getColumnName());
+ LOG.info("Parsed drop column DDL SQL is: {}", dropColumnDDL);
+ return dropColumnDDL;
+ }
+
+ private List<String> processAddColumnOperation(
+ SourceConnector sourceConnector, AlterExpression alterExpression,
String dorisTable) {
+ List<ColumnDataType> colDataTypeList =
alterExpression.getColDataTypeList();
+ List<String> addColumnList = new ArrayList<>();
+ for (ColumnDataType columnDataType : colDataTypeList) {
+ String columnName = columnDataType.getColumnName();
+ ColDataType colDataType = columnDataType.getColDataType();
+ String datatype = colDataType.getDataType();
+ Integer length = null;
+ Integer scale = null;
+ if
(CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) {
+ List<String> argumentsStringList =
colDataType.getArgumentsStringList();
+ length = Integer.parseInt(argumentsStringList.get(0));
+ if (argumentsStringList.size() == 2) {
+ scale = Integer.parseInt(argumentsStringList.get(1));
+ }
+ }
+ datatype =
+ JsonDebeziumChangeUtils.buildDorisTypeName(
+ sourceConnector, datatype, length, scale);
+
+ List<String> columnSpecs = columnDataType.getColumnSpecs();
+ String defaultValue = extractDefaultValue(columnSpecs);
+ String comment = extractComment(columnSpecs);
+ FieldSchema fieldSchema = new FieldSchema(columnName, datatype,
defaultValue, comment);
+ String addColumnDDL =
SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema);
+ LOG.info("Parsed add column DDL SQL is: {}", addColumnDDL);
+ addColumnList.add(addColumnDDL);
+ }
+ return addColumnList;
+ }
+
+ private String processChangeColumnOperation(
+ AlterExpression alterExpression, String dorisTable) {
+ String columnNewName =
alterExpression.getColDataTypeList().get(0).getColumnName();
+ String columnOldName = alterExpression.getColumnOldName();
+ String renameColumnDDL =
+ SchemaChangeHelper.buildRenameColumnDDL(dorisTable,
columnOldName, columnNewName);
+ LOG.warn(
+ "Note: Only rename column names are supported in doris. "
+ + "Therefore, the change syntax used here only
supports the use of rename."
+ + " Parsed change column DDL SQL is: {}",
+ renameColumnDDL);
+ return renameColumnDDL;
+ }
+
+ private String processRenameColumnOperation(
+ AlterExpression alterExpression, String dorisTable) {
+ String columnNewName = alterExpression.getColumnName();
+ String columnOldName = alterExpression.getColumnOldName();
+ String renameColumnDDL =
+ SchemaChangeHelper.buildRenameColumnDDL(dorisTable,
columnOldName, columnNewName);
+ LOG.info("Parsed rename column DDL SQL is: {}", renameColumnDDL);
+ return renameColumnDDL;
+ }
+
+ @VisibleForTesting
+ public String extractDefaultValue(List<String> columnSpecs) {
+ return extractAdjacentString(columnSpecs, DEFAULT);
+ }
+
+ private String extractAdjacentString(List<String> columnSpecs, String key)
{
+ int columnSpecsSize = columnSpecs.size();
+ for (int i = 0; i < columnSpecsSize; i++) {
+ String columnSpec = columnSpecs.get(i);
+ if (key.equalsIgnoreCase(columnSpec) && i < columnSpecsSize - 1) {
+ String adjacentString = columnSpecs.get(i + 1);
+ if (!(DEFAULT.equalsIgnoreCase(adjacentString))
+ && !(COMMENT.equalsIgnoreCase(adjacentString))) {
+ return removeQuotes(adjacentString);
+ }
+ LOG.warn(
+ "Failed to extract adjacent string value.
columnSpecs={}, key={}",
+ String.join(",", columnSpecs),
+ key);
+ }
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ public String extractComment(List<String> columnSpecs) {
+ return extractAdjacentString(columnSpecs, COMMENT);
+ }
+
+ private String removeQuotes(String content) {
+ content = removeContinuousChar(content, '\'');
+ content = removeContinuousChar(content, '\"');
+ return content;
+ }
+
+ /**
+ * remove the continuous char in the string from both sides.
+ *
+ * @param str the input string, target the char to be removed
+ * @return the string without continuous chars from both sides
+ */
+ @VisibleForTesting
+ public String removeContinuousChar(String str, char target) {
+ if (str == null || str.length() < 2) {
+ return str;
+ }
+ int start = 0;
+ int end = str.length() - 1;
+ while (start <= end && str.charAt(start) == target) {
+ start++;
+ }
+ while (end >= start && str.charAt(end) == target) {
+ end--;
+ }
+ return str.substring(start, end + 1);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 8d365ffc..06546877 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -17,10 +17,9 @@
package org.apache.doris.flink.sink.schema;
-import org.apache.flink.util.StringUtils;
-
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
@@ -179,7 +178,7 @@ public class SchemaChangeHelper {
}
private static void commentColumn(StringBuilder ddl, String comment) {
- if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ if (StringUtils.isNotEmpty(comment)) {
ddl.append(" COMMENT
'").append(DorisSystem.quoteComment(comment)).append("'");
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java
new file mode 100644
index 00000000..e55a4d31
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+public enum SchemaChangeMode {
+ DEBEZIUM_STRUCTURE("debezium_structure"),
+ SQL_PARSER("sql_parser");
+
+ private final String name;
+
+ SchemaChangeMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index e657864a..c1ed1de1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -29,12 +29,14 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.schema.SchemaChangeMode;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
+import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.SQLParserSchemaChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +78,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private String targetTableSuffix;
private JsonDebeziumDataChange dataChange;
private JsonDebeziumSchemaChange schemaChange;
+ private SchemaChangeMode schemaChangeMode;
private final Set<String> initTableSet = new HashSet<>();
public JsonDebeziumSchemaSerializer(
@@ -120,13 +123,15 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
Map<String, String> tableProperties,
String targetDatabase,
String targetTablePrefix,
- String targetTableSuffix) {
+ String targetTableSuffix,
+ SchemaChangeMode schemaChangeMode) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange,
executionOptions);
this.tableMapping = tableMapping;
this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
+ this.schemaChangeMode = schemaChangeMode;
init();
}
@@ -144,13 +149,29 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
ignoreUpdateBefore,
targetTablePrefix,
targetTableSuffix);
- this.schemaChange =
- newSchemaChange
- ? new JsonDebeziumSchemaChangeImplV2(changeContext)
- : new JsonDebeziumSchemaChangeImpl(changeContext);
+ initSchemaChangeInstance(changeContext);
this.dataChange = new JsonDebeziumDataChange(changeContext);
}
+ private void initSchemaChangeInstance(JsonDebeziumChangeContext
changeContext) {
+ if (!newSchemaChange) {
+ LOG.info(
+ "newSchemaChange set to false, instantiation schema change
uses JsonDebeziumSchemaChangeImpl.");
+ this.schemaChange = new
JsonDebeziumSchemaChangeImpl(changeContext);
+ return;
+ }
+
+ if (Objects.nonNull(schemaChangeMode)
+ && SchemaChangeMode.SQL_PARSER.equals(schemaChangeMode)) {
+ LOG.info(
+ "SchemaChangeMode set to SQL_PARSER, instantiation schema
change uses SQLParserService.");
+ this.schemaChange = new SQLParserSchemaChange(changeContext);
+ } else {
+ LOG.info("instantiation schema change uses
JsonDebeziumSchemaChangeImplV2.");
+ this.schemaChange = new
JsonDebeziumSchemaChangeImplV2(changeContext);
+ }
+ }
+
@Override
public DorisRecord serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
@@ -201,6 +222,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private Pattern addDropDDLPattern;
private String sourceTableName;
private boolean newSchemaChange = true;
+ private SchemaChangeMode schemaChangeMode;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;
private Map<String, String> tableProperties;
@@ -218,6 +240,14 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return this;
}
+ public JsonDebeziumSchemaSerializer.Builder setSchemaChangeMode(String
schemaChangeMode) {
+ if
(org.apache.commons.lang3.StringUtils.isEmpty(schemaChangeMode)) {
+ return this;
+ }
+ this.schemaChangeMode =
SchemaChangeMode.valueOf(schemaChangeMode.toUpperCase());
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern
addDropDDLPattern) {
this.addDropDDLPattern = addDropDDLPattern;
return this;
@@ -273,7 +303,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
tableProperties,
targetDatabase,
targetTablePrefix,
- targetTableSuffix);
+ targetTableSuffix,
+ schemaChangeMode);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index a7253d2f..2a3eebe0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -38,8 +38,8 @@ public class JsonDebeziumChangeContext implements
Serializable {
private final Pattern pattern;
private final String lineDelimiter;
private final boolean ignoreUpdateBefore;
- private String targetTablePrefix;
- private String targetTableSuffix;
+ private final String targetTablePrefix;
+ private final String targetTableSuffix;
public JsonDebeziumChangeContext(
DorisOptions dorisOptions,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
index 921607df..36acecd4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
@@ -23,10 +23,20 @@ import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+import org.apache.doris.flink.tools.cdc.oracle.OracleType;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import java.util.Map;
+import static org.apache.doris.flink.tools.cdc.SourceConnector.MYSQL;
+import static org.apache.doris.flink.tools.cdc.SourceConnector.ORACLE;
+import static org.apache.doris.flink.tools.cdc.SourceConnector.POSTGRES;
+import static org.apache.doris.flink.tools.cdc.SourceConnector.SQLSERVER;
+
public class JsonDebeziumChangeUtils {
public static String getDorisTableIdentifier(
@@ -62,4 +72,27 @@ public class JsonDebeziumChangeUtils {
? record.get(key).asText()
: null;
}
+
+ public static String buildDorisTypeName(
+ SourceConnector sourceConnector, String dataType, Integer length,
Integer scale) {
+ String dorisTypeName;
+ switch (sourceConnector) {
+ case MYSQL:
+ dorisTypeName = MysqlType.toDorisType(dataType, length, scale);
+ break;
+ case ORACLE:
+ dorisTypeName = OracleType.toDorisType(dataType, length,
scale);
+ break;
+ case POSTGRES:
+ dorisTypeName = PostgresType.toDorisType(dataType, length,
scale);
+ break;
+ case SQLSERVER:
+ dorisTypeName = SqlServerType.toDorisType(dataType, length,
scale);
+ break;
+ default:
+ String errMsg = sourceConnector + " not support " + dataType +
" schema change.";
+ throw new UnsupportedOperationException(errMsg);
+ }
+ return dorisTypeName;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index ccb20469..a2164b72 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -25,11 +25,20 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.regex.Pattern;
/**
@@ -43,6 +52,7 @@ import java.util.regex.Pattern;
* be enabled by configuring use-new-schema-change.
*/
public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
+ private static final Logger LOG =
LoggerFactory.getLogger(JsonDebeziumSchemaChange.class);
protected static String addDropDDLRegex =
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
protected Pattern addDropDDLPattern;
@@ -55,6 +65,7 @@ public abstract class JsonDebeziumSchemaChange extends
CdcSchemaChange {
protected Map<String, String> tableMapping;
protected SchemaChangeManager schemaChangeManager;
protected JsonDebeziumChangeContext changeContext;
+ protected SourceConnector sourceConnector;
public abstract boolean schemaChange(JsonNode recordRoot);
@@ -89,6 +100,12 @@ public abstract class JsonDebeziumSchemaChange extends
CdcSchemaChange {
: null;
}
+ /**
+ * Parse doris database and table as a tuple.
+ *
+ * @param record from flink cdc.
+ * @return Tuple(database, table)
+ */
protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
String identifier =
JsonDebeziumChangeUtils.getDorisTableIdentifier(record,
dorisOptions, tableMapping);
@@ -120,6 +137,58 @@ public abstract class JsonDebeziumSchemaChange extends
CdcSchemaChange {
return record;
}
+ /** Parse event type. */
+ protected EventType extractEventType(JsonNode record) throws
JsonProcessingException {
+ JsonNode tableChange = extractTableChange(record);
+ if (tableChange == null || tableChange.get("type") == null) {
+ return null;
+ }
+ String type = tableChange.get("type").asText();
+ if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
+ return EventType.ALTER;
+ } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
+ return EventType.CREATE;
+ }
+ LOG.warn("Not supported this event type. type={}", type);
+ return null;
+ }
+
+ protected JsonNode extractTableChange(JsonNode record) throws
JsonProcessingException {
+ JsonNode historyRecord = extractHistoryRecord(record);
+ JsonNode tableChanges = historyRecord.get("tableChanges");
+ if (Objects.nonNull(tableChanges)) {
+ return tableChanges.get(0);
+ }
+ LOG.warn("Failed to extract tableChanges. record={}", record);
+ return null;
+ }
+
+ protected boolean executeAlterDDLs(
+ List<String> ddlSqlList,
+ JsonNode recordRoot,
+ Tuple2<String, String> dorisTableTuple,
+ boolean status)
+ throws IOException, IllegalArgumentException {
+ if (CollectionUtils.isEmpty(ddlSqlList)) {
+ LOG.info("The recordRoot cannot extract ddl sql. recordRoot={}",
recordRoot);
+ return false;
+ }
+
+ for (String ddlSql : ddlSqlList) {
+ status = schemaChangeManager.execute(ddlSql, dorisTableTuple.f0);
+ LOG.info("schema change status:{}, ddl: {}", status, ddlSql);
+ }
+ return status;
+ }
+
+ protected void extractSourceConnector(JsonNode record) {
+ if (Objects.isNull(sourceConnector)) {
+ sourceConnector =
+ SourceConnector.valueOf(
+
record.get("source").get("connector").asText().toUpperCase());
+ }
+ }
+
public Map<String, String> getTableMapping() {
return tableMapping;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
index 614f06a7..09f0f3a6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
@@ -35,7 +35,16 @@ import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-/** Use expression to match ddl sql. */
+/**
+ * Use expression to match ddl sql.
+ *
+ * <p>The way of parsing DDL statements relies on regular expression matching,
and this parsing
+ * method has many flaws. In order to solve this problem, we introduced the
com.github.jsqlparser
+ * framework, which can accurately parse the schema change of DDL.
+ *
+ * <p>This class is no longer recommended, we recommend using {@link
SQLParserSchemaChange}
+ */
+@Deprecated
public class JsonDebeziumSchemaChangeImpl extends JsonDebeziumSchemaChange {
private static final Logger LOG =
LoggerFactory.getLogger(JsonDebeziumSchemaChangeImpl.class);
// alter table tbl add cloumn aca int
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 9b41e2fd..7ef975e2 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -40,10 +40,6 @@ import
org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.EventType;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceConnector;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
-import org.apache.doris.flink.tools.cdc.oracle.OracleType;
-import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
-import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +51,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
@@ -74,12 +71,11 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
Pattern.CASE_INSENSITIVE);
// schemaChange saves table names, field, and field column information
private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new
LinkedHashMap<>();
- private SourceConnector sourceConnector;
// create table properties
private final Map<String, String> tableProperties;
- private String targetDatabase;
- private String targetTablePrefix;
- private String targetTableSuffix;
+ private final String targetDatabase;
+ private final String targetTablePrefix;
+ private final String targetTableSuffix;
private final Set<String> filledTables = new HashSet<>();
public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext
changeContext) {
@@ -124,6 +120,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
EventType eventType = extractEventType(recordRoot);
if (eventType == null) {
+ LOG.warn("Failed to parse eventType. recordRoot={}",
recordRoot);
return false;
}
if (eventType.equals(EventType.CREATE)) {
@@ -137,43 +134,20 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
LOG.info("create table ddl status: {}", status);
}
} else if (eventType.equals(EventType.ALTER)) {
- // db,table
- Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
- if (tuple == null) {
+ Tuple2<String, String> dorisTableTuple =
getDorisTableTuple(recordRoot);
+ if (dorisTableTuple == null) {
+ LOG.warn("Failed to get doris table tuple. record={}",
recordRoot);
return false;
}
List<String> ddlSqlList = extractDDLList(recordRoot);
- if (CollectionUtils.isEmpty(ddlSqlList)) {
- LOG.info("ddl can not do schema change:{}", recordRoot);
- return false;
- }
- List<DDLSchema> ddlSchemas =
SchemaChangeHelper.getDdlSchemas();
- for (int i = 0; i < ddlSqlList.size(); i++) {
- DDLSchema ddlSchema = ddlSchemas.get(i);
- String ddlSql = ddlSqlList.get(i);
- boolean doSchemaChange = checkSchemaChange(tuple.f0,
tuple.f1, ddlSchema);
- status = doSchemaChange &&
schemaChangeManager.execute(ddlSql, tuple.f0);
- LOG.info("schema change status:{}, ddl:{}", status,
ddlSql);
- }
- } else {
- LOG.info("Unsupported event type {}", eventType);
+ status = executeAlterDDLs(ddlSqlList, recordRoot,
dorisTableTuple, status);
}
} catch (Exception ex) {
- LOG.warn("schema change error :", ex);
+ LOG.warn("schema change error : ", ex);
}
return status;
}
- private JsonNode extractTableChange(JsonNode record) throws
JsonProcessingException {
- JsonNode historyRecord = extractHistoryRecord(record);
- JsonNode tableChanges = historyRecord.get("tableChanges");
- if (!Objects.isNull(tableChanges)) {
- JsonNode tableChange = tableChanges.get(0);
- return tableChange;
- }
- return null;
- }
-
/** Parse Alter Event. */
@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws IOException {
@@ -181,11 +155,19 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
JsonDebeziumChangeUtils.getDorisTableIdentifier(record,
dorisOptions, tableMapping);
JsonNode historyRecord = extractHistoryRecord(record);
String ddl = extractJsonNode(historyRecord, "ddl");
+ extractSourceConnector(record);
+ return parserDebeziumStructure(dorisTable, ddl, record);
+ }
+
+ private List<String> parserDebeziumStructure(String dorisTable, String
ddl, JsonNode record)
+ throws JsonProcessingException {
JsonNode tableChange = extractTableChange(record);
- EventType eventType = extractEventType(record);
- if (Objects.isNull(tableChange)
- || Objects.isNull(ddl)
- || !eventType.equals(EventType.ALTER)) {
+ if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
+ LOG.warn(
+ "tableChange or ddl is empty, cannot do schema change.
dorisTable={}, tableChange={}, ddl={}",
+ dorisTable,
+ tableChange,
+ ddl);
return null;
}
@@ -284,7 +266,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
return tableBucketsMap.get(tableName);
}
// Secondly, iterate over the map to find a corresponding regular
expression match,
- for (Map.Entry<String, Integer> entry :
tableBucketsMap.entrySet()) {
+ for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
Pattern pattern = Pattern.compile(entry.getKey());
if (pattern.matcher(tableName).matches()) {
@@ -301,7 +283,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
return primaryKeys;
}
if (!fields.isEmpty()) {
- Map.Entry<String, FieldSchema> firstField =
fields.entrySet().iterator().next();
+ Entry<String, FieldSchema> firstField =
fields.entrySet().iterator().next();
return Collections.singletonList(firstField.getKey());
}
return new ArrayList<>();
@@ -320,21 +302,6 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
return schemaChangeManager.checkSchemaChange(database, table, param);
}
- /** Parse event type. */
- protected EventType extractEventType(JsonNode record) throws
JsonProcessingException {
- JsonNode tableChange = extractTableChange(record);
- if (tableChange == null || tableChange.get("type") == null) {
- return null;
- }
- String type = tableChange.get("type").asText();
- if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
- return EventType.ALTER;
- } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
- return EventType.CREATE;
- }
- return null;
- }
-
private Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
@@ -402,25 +369,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
int length = column.get("length") == null ? 0 :
column.get("length").asInt();
int scale = column.get("scale") == null ? 0 :
column.get("scale").asInt();
String sourceTypeName = column.get("typeName").asText();
- String dorisTypeName;
- switch (sourceConnector) {
- case MYSQL:
- dorisTypeName = MysqlType.toDorisType(sourceTypeName, length,
scale);
- break;
- case ORACLE:
- dorisTypeName = OracleType.toDorisType(sourceTypeName, length,
scale);
- break;
- case POSTGRES:
- dorisTypeName = PostgresType.toDorisType(sourceTypeName,
length, scale);
- break;
- case SQLSERVER:
- dorisTypeName = SqlServerType.toDorisType(sourceTypeName,
length, scale);
- break;
- default:
- String errMsg = "Not support " + sourceTypeName + " schema
change.";
- throw new UnsupportedOperationException(errMsg);
- }
- return dorisTypeName;
+ return JsonDebeziumChangeUtils.buildDorisTypeName(
+ sourceConnector, sourceTypeName, length, scale);
}
private String handleDefaultValue(String defaultValue) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
new file mode 100644
index 00000000..6be3f72c
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.sink.schema.SQLParserSchemaManager;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SQLParserSchemaChange extends JsonDebeziumSchemaChange {
+ private static final Logger LOG =
LoggerFactory.getLogger(SQLParserSchemaChange.class);
+ private final SQLParserSchemaManager sqlParserSchemaManager;
+
+ public SQLParserSchemaChange(JsonDebeziumChangeContext changeContext) {
+ this.changeContext = changeContext;
+ this.dorisOptions = changeContext.getDorisOptions();
+ this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+ this.sqlParserSchemaManager = new SQLParserSchemaManager();
+ this.tableMapping = changeContext.getTableMapping();
+ this.objectMapper = changeContext.getObjectMapper();
+ }
+
+ @Override
+ public void init(JsonNode recordRoot, String dorisTableName) {
+ // do nothing
+ }
+
+ @Override
+ public boolean schemaChange(JsonNode recordRoot) {
+ boolean status = false;
+ try {
+ if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) &&
!checkTable(recordRoot)) {
+ return false;
+ }
+
+ EventType eventType = extractEventType(recordRoot);
+ if (eventType == null) {
+ LOG.warn("Failed to parse eventType. recordRoot={}",
recordRoot);
+ return false;
+ }
+
+ if (eventType.equals(EventType.CREATE)) {
+ // TODO support auto create table
+ LOG.warn("Not auto support create table. recordRoot={}",
recordRoot);
+ } else if (eventType.equals(EventType.ALTER)) {
+ Tuple2<String, String> dorisTableTuple =
getDorisTableTuple(recordRoot);
+ if (dorisTableTuple == null) {
+ LOG.warn("Failed to get doris table tuple. record={}",
recordRoot);
+ return false;
+ }
+ List<String> ddlList = tryParserAlterDDLs(recordRoot);
+ status = executeAlterDDLs(ddlList, recordRoot,
dorisTableTuple, status);
+ }
+ } catch (Exception ex) {
+ LOG.warn("schema change error : ", ex);
+ }
+ return status;
+ }
+
+ @VisibleForTesting
+ public List<String> tryParserAlterDDLs(JsonNode record) throws IOException
{
+ String dorisTable =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(record,
dorisOptions, tableMapping);
+ JsonNode historyRecord = extractHistoryRecord(record);
+ String ddl = extractJsonNode(historyRecord, "ddl");
+ extractSourceConnector(record);
+ return sqlParserSchemaManager.parserAlterDDLs(sourceConnector, ddl,
dorisTable);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 38b942ea..55e864ca 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -130,6 +130,7 @@ public class CdcTools {
String excludingTables = params.get("excluding-tables");
String multiToOneOrigin = params.get("multi-to-one-origin");
String multiToOneTarget = params.get("multi-to-one-target");
+ String schemaChangeMode = params.get("schema-change-mode");
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean ignoreIncompatible = params.has("ignore-incompatible");
@@ -157,6 +158,7 @@ public class CdcTools {
.setCreateTableOnly(createTableOnly)
.setSingleSink(singleSink)
.setIgnoreIncompatible(ignoreIncompatible)
+ .setSchemaChangeMode(schemaChangeMode)
.create();
databaseSync.build();
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 5cea70f9..9c4f2ac4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -78,6 +78,7 @@ public abstract class DatabaseSync {
public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
private boolean newSchemaChange = true;
+ private String schemaChangeMode;
protected String includingTables;
protected String excludingTables;
protected String multiToOneOrigin;
@@ -342,6 +343,7 @@ public abstract class DatabaseSync {
return JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
+ .setSchemaChangeMode(schemaChangeMode)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
@@ -560,6 +562,11 @@ public abstract class DatabaseSync {
return this;
}
+ public DatabaseSync setSchemaChangeMode(String schemaChangeMode) {
+ this.schemaChangeMode = schemaChangeMode.trim();
+ return this;
+ }
+
public DatabaseSync setSingleSink(boolean singleSink) {
this.singleSink = singleSink;
return this;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
new file mode 100644
index 00000000..cbe3f08a
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class SQLParserSchemaManagerTest {
+ private SQLParserSchemaManager schemaManager;
+ private String dorisTable;
+
+ @Before
+ public void init() {
+ schemaManager = new SQLParserSchemaManager();
+ dorisTable = "doris.tab";
+ }
+
+ @Test
+ public void testParserAlterDDLs() {
+ List<String> expectDDLs = new ArrayList<>();
+ expectDDLs.add("ALTER TABLE `doris`.`tab` DROP COLUMN `c1`");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` DROP COLUMN `c2`");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `c3` INT DEFAULT
'100'");
+ expectDDLs.add(
+ "ALTER TABLE `doris`.`tab` ADD COLUMN `decimal_type`
DECIMALV3(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment'");
+ expectDDLs.add(
+ "ALTER TABLE `doris`.`tab` ADD COLUMN `create_time`
DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP COMMENT 'time_comment'");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c10` `c11`");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c12` `c13`");
+
+ SourceConnector mysql = SourceConnector.MYSQL;
+ String ddl =
+ "alter table t1 drop c1, drop column c2, add c3 int default
100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT
'decimal_type_comment', add `create_time` datetime(3) DEFAULT
CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change
column c12 c13 varchar(10)";
+ List<String> actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl,
dorisTable);
+ for (String actualDDL : actualDDLs) {
+ Assert.assertTrue(expectDDLs.contains(actualDDL));
+ }
+ }
+
+ @Test
+ public void testParserAlterDDLsAdd() {
+ List<String> expectDDLs = new ArrayList<>();
+ expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `phone_number`
VARCHAR(60)");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `address`
VARCHAR(765)");
+
+ SourceConnector mysql = SourceConnector.ORACLE;
+ String ddl =
+ "ALTER TABLE employees ADD (phone_number VARCHAR2(20), address
VARCHAR2(255));";
+ List<String> actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl,
dorisTable);
+ for (String actualDDL : actualDDLs) {
+ Assert.assertTrue(expectDDLs.contains(actualDDL));
+ }
+ }
+
+ @Test
+ public void testParserAlterDDLsChange() {
+ List<String> expectDDLs = new ArrayList<>();
+ expectDDLs.add(
+ "ALTER TABLE `doris`.`tab` RENAME COLUMN `old_phone_number`
`new_phone_number`");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `old_address`
`new_address`");
+
+ SourceConnector mysql = SourceConnector.MYSQL;
+ String ddl =
+ "ALTER TABLE employees\n"
+ + "CHANGE COLUMN old_phone_number new_phone_number
VARCHAR(20) NOT NULL,\n"
+ + "CHANGE COLUMN old_address new_address VARCHAR(255)
DEFAULT 'Unknown',\n"
+ + "MODIFY COLUMN hire_date TIMESTAMP NOT NULL DEFAULT
CURRENT_TIMESTAMP;";
+ List<String> actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl,
dorisTable);
+ for (String actualDDL : actualDDLs) {
+ Assert.assertTrue(expectDDLs.contains(actualDDL));
+ }
+ }
+
+ @Test
+ public void testExtractCommentValue() {
+ String expectComment = "";
+ List<String> columnSpecs = Arrays.asList("default", "'100'",
"COMMENT", "''");
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertEquals(expectComment, actualComment);
+ }
+
+ @Test
+ public void testExtractCommentValueQuotes() {
+ String expectComment = "comment_test";
+ List<String> columnSpecs =
+ Arrays.asList("Default", "\"100\"", "comment",
"\"comment_test\"");
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertEquals(expectComment, actualComment);
+ }
+
+ @Test
+ public void testExtractCommentValueNull() {
+ List<String> columnSpecs = Arrays.asList("default", null, "CommenT",
null);
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertNull(actualComment);
+ }
+
+ @Test
+ public void testExtractCommentValueEmpty() {
+ List<String> columnSpecs = Arrays.asList("default", null, "comment");
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertNull(actualComment);
+ }
+
+ @Test
+ public void testExtractCommentValueA() {
+ String expectComment = "test";
+ List<String> columnSpecs = Arrays.asList("comment", "test");
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertEquals(expectComment, actualComment);
+ }
+
+ @Test
+ public void testExtractDefaultValue() {
+ String expectDefault = "100";
+ List<String> columnSpecs = Arrays.asList("default", "'100'",
"comment", "");
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertEquals(expectDefault, actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueQuotes() {
+ String expectDefault = "100";
+ List<String> columnSpecs = Arrays.asList("default", "\"100\"",
"comment", "");
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertEquals(expectDefault, actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueNull() {
+ List<String> columnSpecs = Arrays.asList("Default", null, "comment",
null);
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertNull(actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueEmpty() {
+ String expectDefault = null;
+ List<String> columnSpecs = Arrays.asList("DEFAULT", "comment", null);
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertEquals(expectDefault, actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueA() {
+ String expectDefault = "aaa";
+ List<String> columnSpecs = Arrays.asList("default", "aaa");
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertEquals(expectDefault, actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueNULL() {
+ List<String> columnSpecs = Collections.singletonList("default");
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertNull(actualDefault);
+ }
+
+ @Test
+ public void testRemoveContinuousChar() {
+ // Test removing continuous target characters from both ends
+ Assert.assertEquals("bc",
schemaManager.removeContinuousChar("aaaabcaaa", 'a'));
+ Assert.assertEquals("bcde",
schemaManager.removeContinuousChar("abcdea", 'a'));
+
+ // Test cases with no target character
+ Assert.assertEquals("abc", schemaManager.removeContinuousChar("abc",
'x'));
+
+ // Test cases with only target characters
+ Assert.assertEquals("", schemaManager.removeContinuousChar("aaaa",
'a'));
+ Assert.assertEquals("", schemaManager.removeContinuousChar("xxxxxxxx",
'x'));
+
+ // Test empty and null strings
+ Assert.assertNull(schemaManager.removeContinuousChar(null, 'a'));
+ Assert.assertEquals("", schemaManager.removeContinuousChar("", 'a'));
+
+ // Test single character strings
+ Assert.assertEquals("b", schemaManager.removeContinuousChar("b", 'a'));
+
+ // Test removing quotes
+ Assert.assertEquals("abc",
schemaManager.removeContinuousChar("\"abc\"", '\"'));
+ Assert.assertEquals("a\"bc\"d",
schemaManager.removeContinuousChar("\"a\"bc\"d\"", '\"'));
+ Assert.assertEquals("abc", schemaManager.removeContinuousChar("'abc'",
'\''));
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
new file mode 100644
index 00000000..d31ab04a
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase {
+
+ private SQLParserSchemaChange schemaChange;
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ JsonDebeziumChangeContext changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ null,
+ null,
+ null,
+ objectMapper,
+ null,
+ lineDelimiter,
+ ignoreUpdateBefore,
+ "",
+ "");
+ schemaChange = new SQLParserSchemaChange(changeContext);
+ }
+
+ @Test
+ public void testExtractDDLListMultipleColumns() throws IOException {
+ String sql0 = "ALTER TABLE `test`.`t1` DROP COLUMN `c11`";
+ String sql1 = "ALTER TABLE `test`.`t1` DROP COLUMN `c3`";
+ String sql2 = "ALTER TABLE `test`.`t1` ADD COLUMN `c12` INT DEFAULT
'100'";
+ List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2);
+
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
[...]
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+ for (int i = 0; i < ddlSQLList.size(); i++) {
+ String srcSQL = srcSqlList.get(i);
+ String targetSQL = ddlSQLList.get(i);
+ Assert.assertEquals(srcSQL, targetSQL);
+ }
+ }
+
+ @Test
+ public void testExtractDDLListChangeColumn() throws IOException {
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"
[...]
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+
+ String result = "ALTER TABLE `test`.`t1` RENAME COLUMN `c555` `c777`";
+ Assert.assertEquals(result, ddlSQLList.get(0));
+ }
+
+ @Test
+ public void testExtractDDLListRenameColumn() throws IOException {
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
[...]
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+ Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c22`
`c33`", ddlSQLList.get(0));
+ }
+
+ @Test
+ public void testExtractDDlListChangeName() throws IOException {
+ String columnInfo =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710925209991,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000288\",\"pos\":81654,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000288\\\",\\\"pos\\\":81654,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\
[...]
+ JsonNode record = objectMapper.readTree(columnInfo);
+ List<String> changeNameList = schemaChange.tryParserAlterDDLs(record);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` RENAME COLUMN `age` `age1`",
changeNameList.get(0));
+ }
+
+ @Test
+ public void testExtractDDlListChangeNameWithColumn() throws IOException {
+ String columnInfo =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1711088321412,\"snapshot\":\"false\",\"db\":\"doris_test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000292\",\"pos\":55695,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000292\\\",\\\"pos\\\":55695,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":nu
[...]
+ JsonNode record = objectMapper.readTree(columnInfo);
+ List<String> changeNameList = schemaChange.tryParserAlterDDLs(record);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` RENAME COLUMN `key` `key_word`",
changeNameList.get(0));
+ }
+
+ @Test
+ public void testAddDatetimeColumn() throws IOException {
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720596740767,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":10192,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":10192,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
[...]
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ List<String> changeNameList =
schemaChange.tryParserAlterDDLs(recordJsonNode);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` ADD COLUMN `create_time`
DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP COMMENT 'datatime_test'",
+ changeNameList.get(0));
+ }
+
+ @Test
+ public void testDropColumn() throws IOException {
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720599133910,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":12084,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12084,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
[...]
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ List<String> changeNameList =
schemaChange.tryParserAlterDDLs(recordJsonNode);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` DROP COLUMN `create_time`",
changeNameList.get(0));
+ }
+
+ @Test
+ public void testChangeColumn() throws IOException {
+ String record =
+
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720598926291,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":11804,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":11804,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
[...]
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ List<String> changeNameList =
schemaChange.tryParserAlterDDLs(recordJsonNode);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` RENAME COLUMN `create_time2`
`create_time`",
+ changeNameList.get(0));
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 2410ddac..07744e37 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.doris.flink.sink.schema.SchemaChangeMode;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import java.util.HashMap;
@@ -71,7 +72,8 @@ public class CdcMysqlSyncDatabaseCase {
String multiToOneOrigin = "a_.*|b_.*";
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
- boolean useNewSchemaChange = false;
+ boolean useNewSchemaChange = true;
+ String schemaChangeMode =
SchemaChangeMode.DEBEZIUM_STRUCTURE.getName();
boolean singleSink = false;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
@@ -90,6 +92,7 @@ public class CdcMysqlSyncDatabaseCase {
.setTableConfig(tableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(useNewSchemaChange)
+ .setSchemaChangeMode(schemaChangeMode)
.setSingleSink(singleSink)
.setIgnoreIncompatible(ignoreIncompatible)
.create();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index fba5866c..35a5719a 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -76,7 +76,7 @@ public class CdcOraclelSyncDatabaseCase {
String multiToOneOrigin = "a_.*|b_.*";
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
- boolean useNewSchemaChange = false;
+ boolean useNewSchemaChange = true;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new OracleDatabaseSync();
databaseSync
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 6c933409..4d5a56f7 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -78,7 +78,7 @@ public class CdcPostgresSyncDatabaseCase {
String multiToOneOrigin = "a_.*|b_.*";
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
- boolean useNewSchemaChange = false;
+ boolean useNewSchemaChange = true;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new PostgresDatabaseSync();
databaseSync
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 9fec63b6..3d6e1e99 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -76,7 +76,7 @@ public class CdcSqlServerSyncDatabaseCase {
String multiToOneOrigin = "a_.*|b_.*";
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
- boolean useNewSchemaChange = false;
+ boolean useNewSchemaChange = true;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new SqlServerDatabaseSync();
databaseSync
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]