This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 99f2da9 [fix]Fix multi-table sink, schema change result is incorrect
(#313)
99f2da9 is described below
commit 99f2da9565b0f64ba56799f1b6ae89305692cbf2
Author: wudongliang <[email protected]>
AuthorDate: Thu Feb 1 16:10:09 2024 +0800
[fix]Fix multi-table sink, schema change result is incorrect (#313)
---
.../serializer/JsonDebeziumSchemaSerializer.java | 23 ++-
.../jsondebezium/JsonDebeziumChangeUtils.java | 65 +++++++++
.../jsondebezium/JsonDebeziumDataChange.java | 41 +-----
.../jsondebezium/JsonDebeziumSchemaChange.java | 28 +---
.../jsondebezium/JsonDebeziumSchemaChangeImpl.java | 11 +-
.../JsonDebeziumSchemaChangeImplV2.java | 47 +++---
.../jsondebezium/TestJsonDebeziumDataChange.java | 18 ++-
.../TestJsonDebeziumSchemaChangeImplV2.java | 158 ++++++++++++++++++---
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 2 +
9 files changed, 284 insertions(+), 109 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 5bea2d9..d34c1a3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
@@ -36,8 +37,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.regex.Pattern;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
@@ -58,7 +61,6 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private final ObjectMapper objectMapper = new ObjectMapper();
// table name of the cdc upstream, format is db.tbl
private final String sourceTableName;
- private boolean firstLoad;
private final boolean newSchemaChange;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
@@ -71,6 +73,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private String targetTableSuffix;
private JsonDebeziumDataChange dataChange;
private JsonDebeziumSchemaChange schemaChange;
+ private final Set<String> initTableSet = new HashSet<>();
public JsonDebeziumSchemaSerializer(
DorisOptions dorisOptions,
@@ -85,7 +88,6 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
JsonNodeFactory jsonNodeFactory =
JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.newSchemaChange = newSchemaChange;
- this.firstLoad = true;
}
public JsonDebeziumSchemaSerializer(
@@ -156,13 +158,24 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return null;
}
- if (firstLoad) {
- schemaChange.init(recordRoot);
- firstLoad = false;
+ this.tableMapping = schemaChange.getTableMapping();
+ String dorisTableName =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ recordRoot, dorisOptions, tableMapping);
+ if (initSchemaChange(dorisTableName)) {
+ schemaChange.init(recordRoot, dorisTableName);
}
return dataChange.serialize(record, recordRoot, op);
}
+ private boolean initSchemaChange(String dorisTableName) {
+ if (initTableSet.contains(dorisTableName)) {
+ return false;
+ }
+ initTableSet.add(dorisTableName);
+ return true;
+ }
+
private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null && !(record.get(key)
instanceof NullNode)
? record.get(key).asText()
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
new file mode 100644
index 0000000..921607d
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.util.Map;
+
+public class JsonDebeziumChangeUtils {
+
+ public static String getDorisTableIdentifier(
+ JsonNode record, DorisOptions dorisOptions, Map<String, String>
tableMapping) {
+ String identifier = getCdcTableIdentifier(record);
+ return getDorisTableIdentifier(identifier, dorisOptions, tableMapping);
+ }
+
+ public static String getDorisTableIdentifier(
+ String cdcTableIdentifier,
+ DorisOptions dorisOptions,
+ Map<String, String> tableMapping) {
+ if
(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
+ return dorisOptions.getTableIdentifier();
+ }
+ if (!CollectionUtil.isNullOrEmpty(tableMapping)
+ && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+ && tableMapping.get(cdcTableIdentifier) != null) {
+ return tableMapping.get(cdcTableIdentifier);
+ }
+ return null;
+ }
+
+ public static String getCdcTableIdentifier(JsonNode record) {
+ String db = extractJsonNode(record.get("source"), "db");
+ String schema = extractJsonNode(record.get("source"), "schema");
+ String table = extractJsonNode(record.get("source"), "table");
+ return SourceSchema.getString(db, schema, table);
+ }
+
+ public static String extractJsonNode(JsonNode record, String key) {
+ return record != null && record.get(key) != null && !(record.get(key)
instanceof NullNode)
+ ? record.get(key).asText()
+ : null;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
index 5790c48..67aef02 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
@@ -17,19 +17,15 @@
package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +52,7 @@ public class JsonDebeziumDataChange implements ChangeEvent {
private final DorisOptions dorisOptions;
private final boolean ignoreUpdateBefore;
private final String lineDelimiter;
- private JsonDebeziumChangeContext changeContext;
+ private final JsonDebeziumChangeContext changeContext;
public JsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
this.changeContext = changeContext;
@@ -68,8 +64,11 @@ public class JsonDebeziumDataChange implements ChangeEvent {
public DorisRecord serialize(String record, JsonNode recordRoot, String
op) throws IOException {
// Filter out table records that are not in tableMapping
- String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
- String dorisTableIdentifier =
getDorisTableIdentifier(cdcTableIdentifier);
+ Map<String, String> tableMapping = changeContext.getTableMapping();
+ String cdcTableIdentifier =
JsonDebeziumChangeUtils.getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ cdcTableIdentifier, dorisOptions, tableMapping);
if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
LOG.warn(
"filter table {}, because it is not listened, record
detail is {}",
@@ -123,34 +122,6 @@ public class JsonDebeziumDataChange implements ChangeEvent
{
return updateRow.toString().getBytes(StandardCharsets.UTF_8);
}
- @VisibleForTesting
- public String getCdcTableIdentifier(JsonNode record) {
- String db = extractJsonNode(record.get("source"), "db");
- String schema = extractJsonNode(record.get("source"), "schema");
- String table = extractJsonNode(record.get("source"), "table");
- return SourceSchema.getString(db, schema, table);
- }
-
- @VisibleForTesting
- public String getDorisTableIdentifier(String cdcTableIdentifier) {
- if
(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
- return dorisOptions.getTableIdentifier();
- }
- Map<String, String> tableMapping = changeContext.getTableMapping();
- if (!CollectionUtil.isNullOrEmpty(tableMapping)
- && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
- && tableMapping.get(cdcTableIdentifier) != null) {
- return tableMapping.get(cdcTableIdentifier);
- }
- return null;
- }
-
- private String extractJsonNode(JsonNode record, String key) {
- return record != null && record.get(key) != null && !(record.get(key)
instanceof NullNode)
- ? record.get(key).asText()
- : null;
- }
-
private Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index 4c67164..f449857 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -19,7 +19,6 @@ package
org.apache.doris.flink.sink.writer.serializer.jsondebezium;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -60,7 +59,7 @@ public abstract class JsonDebeziumSchemaChange implements
ChangeEvent {
public abstract boolean schemaChange(JsonNode recordRoot);
- public abstract void init(JsonNode recordRoot);
+ public abstract void init(JsonNode recordRoot, String dorisTableName);
/** When cdc synchronizes multiple tables, it will capture multiple table
schema changes. */
protected boolean checkTable(JsonNode recordRoot) {
@@ -89,26 +88,9 @@ public abstract class JsonDebeziumSchemaChange implements
ChangeEvent {
: null;
}
- @VisibleForTesting
- public String getDorisTableIdentifier(String cdcTableIdentifier) {
- if
(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
- return dorisOptions.getTableIdentifier();
- }
- if (!CollectionUtil.isNullOrEmpty(tableMapping)
- && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
- && tableMapping.get(cdcTableIdentifier) != null) {
- return tableMapping.get(cdcTableIdentifier);
- }
- return null;
- }
-
- protected String getDorisTableIdentifier(JsonNode record) {
- String identifier = getCdcTableIdentifier(record);
- return getDorisTableIdentifier(identifier);
- }
-
protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
- String identifier = getDorisTableIdentifier(record);
+ String identifier =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(record,
dorisOptions, tableMapping);
if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
return null;
}
@@ -136,6 +118,10 @@ public abstract class JsonDebeziumSchemaChange implements
ChangeEvent {
return record;
}
+ public Map<String, String> getTableMapping() {
+ return tableMapping;
+ }
+
@VisibleForTesting
public void setSchemaChangeManager(SchemaChangeManager
schemaChangeManager) {
this.schemaChangeManager = schemaChangeManager;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
index 4cf0970..614f06a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
@@ -55,7 +55,7 @@ public class JsonDebeziumSchemaChangeImpl extends
JsonDebeziumSchemaChange {
}
@Override
- public void init(JsonNode recordRoot) {
+ public void init(JsonNode recordRoot, String dorisTableName) {
// do nothing
}
@@ -120,7 +120,14 @@ public class JsonDebeziumSchemaChangeImpl extends
JsonDebeziumSchemaChange {
String col = matcher.group(3);
String type = matcher.group(5);
type = handleType(type);
- ddl = String.format(EXECUTE_DDL,
getDorisTableIdentifier(record), op, col, type);
+ ddl =
+ String.format(
+ EXECUTE_DDL,
+
JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ record, dorisOptions, tableMapping),
+ op,
+ col,
+ type);
LOG.info("parse ddl:{}", ddl);
return ddl;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index e5f6994..993b822 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -48,6 +48,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -66,13 +67,15 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
Pattern.compile(
"ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
Pattern.CASE_INSENSITIVE);
- private Map<String, FieldSchema> originFieldSchemaMap;
+ // schemaChange saves table names, field, and field column information
+ private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new
LinkedHashMap<>();
private SourceConnector sourceConnector;
// create table properties
private final Map<String, String> tableProperties;
private String targetDatabase;
private String targetTablePrefix;
private String targetTableSuffix;
+ private final Set<String> filledTables = new HashSet<>();
public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext
changeContext) {
this.addDropDDLPattern = Pattern.compile(addDropDDLRegex,
Pattern.CASE_INSENSITIVE);
@@ -95,14 +98,15 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
}
@Override
- public void init(JsonNode recordRoot) {
- originFieldSchemaMap = new LinkedHashMap<>();
+ public void init(JsonNode recordRoot, String dorisTableName) {
Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
if (CollectionUtils.isEmpty(columnNameSet)) {
columnNameSet = extractBeforeRow(recordRoot).keySet();
}
- columnNameSet.forEach(
- columnName -> originFieldSchemaMap.put(columnName, new
FieldSchema()));
+ Map<String, FieldSchema> fieldSchemaMap = new LinkedHashMap<>();
+ columnNameSet.forEach(columnName -> fieldSchemaMap.put(columnName, new
FieldSchema()));
+
+ originFieldSchemaMap.put(dorisTableName, fieldSchemaMap);
}
@Override
@@ -168,7 +172,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
/** Parse Alter Event. */
@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws IOException {
- String dorisTable = getDorisTableIdentifier(record);
+ String dorisTable =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(record,
dorisOptions, tableMapping);
JsonNode historyRecord = extractHistoryRecord(record);
String ddl = extractJsonNode(historyRecord, "ddl");
JsonNode tableChange = extractTableChange(record);
@@ -184,16 +189,20 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
sourceConnector =
SourceConnector.valueOf(
record.get("source").get("connector").asText().toUpperCase());
- fillOriginSchema(columns);
+ }
+ if (!filledTables.contains(dorisTable)) {
+ fillOriginSchema(dorisTable, columns);
+ filledTables.add(dorisTable);
}
+ Map<String, FieldSchema> fieldSchemaMap =
originFieldSchemaMap.get(dorisTable);
// rename ddl
Matcher renameMatcher = renameDDLPattern.matcher(ddl);
if (renameMatcher.find()) {
String oldColumnName = renameMatcher.group(2);
String newColumnName = renameMatcher.group(3);
return SchemaChangeHelper.generateRenameDDLSql(
- dorisTable, oldColumnName, newColumnName,
originFieldSchemaMap);
+ dorisTable, oldColumnName, newColumnName, fieldSchemaMap);
}
// add/drop ddl
@@ -201,7 +210,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
- SchemaChangeHelper.compareSchema(updateFiledSchema,
originFieldSchemaMap);
+ SchemaChangeHelper.compareSchema(updateFiledSchema, fieldSchemaMap);
// In order to avoid other source table column change operations other
than add/drop/rename,
// which may lead to the accidental deletion of the doris column.
Matcher matcher = addDropDDLPattern.matcher(ddl);
@@ -330,16 +339,17 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
}
@VisibleForTesting
- public void fillOriginSchema(JsonNode columns) {
- if (Objects.nonNull(originFieldSchemaMap)) {
+ public void fillOriginSchema(String tableName, JsonNode columns) {
+ Map<String, FieldSchema> fieldSchemaMap =
originFieldSchemaMap.get(tableName);
+ if (Objects.nonNull(fieldSchemaMap)) {
for (JsonNode column : columns) {
String fieldName = column.get("name").asText();
- if (originFieldSchemaMap.containsKey(fieldName)) {
+ if (fieldSchemaMap.containsKey(fieldName)) {
String dorisTypeName = buildDorisTypeName(column);
String defaultValue =
handleDefaultValue(extractJsonNode(column,
"defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
- FieldSchema fieldSchema =
originFieldSchemaMap.get(fieldName);
+ FieldSchema fieldSchema = fieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
fieldSchema.setTypeString(dorisTypeName);
fieldSchema.setComment(comment);
@@ -351,8 +361,10 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
"Current schema change failed! You need to ensure that "
+ "there is data in the table."
+ dorisOptions.getTableIdentifier());
- originFieldSchemaMap = new LinkedHashMap<>();
- columns.forEach(column -> buildFieldSchema(originFieldSchemaMap,
column));
+ fieldSchemaMap = new LinkedHashMap<>();
+ Map<String, FieldSchema> finalFieldSchemaMap = fieldSchemaMap;
+ columns.forEach(column -> buildFieldSchema(finalFieldSchemaMap,
column));
+ originFieldSchemaMap.put(tableName, fieldSchemaMap);
}
}
@@ -403,12 +415,13 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
}
@VisibleForTesting
- public void setOriginFieldSchemaMap(Map<String, FieldSchema>
originFieldSchemaMap) {
+ public void setOriginFieldSchemaMap(
+ Map<String, Map<String, FieldSchema>> originFieldSchemaMap) {
this.originFieldSchemaMap = originFieldSchemaMap;
}
@VisibleForTesting
- public Map<String, FieldSchema> getOriginFieldSchemaMap() {
+ public Map<String, Map<String, FieldSchema>> getOriginFieldSchemaMap() {
return originFieldSchemaMap;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
index 8900339..4891d82 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -156,32 +156,38 @@ public class TestJsonDebeziumDataChange extends
TestJsonDebeziumChangeBase {
String insert =
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
10:01:02\",\"ts\":\"2022-01-01
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"t
[...]
JsonNode recordRoot = objectMapper.readTree(insert);
- String identifier = dataChange.getCdcTableIdentifier(recordRoot);
+ String identifier =
JsonDebeziumChangeUtils.getCdcTableIdentifier(recordRoot);
Assert.assertEquals("test.t1", identifier);
String insertSchema =
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
10:01:02\",\"ts\":\"2022-01-01
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"
[...]
String identifierSchema =
-
dataChange.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+
JsonDebeziumChangeUtils.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
Assert.assertEquals("test.dbo.t1", identifierSchema);
String ddl =
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
[...]
- String ddlRes =
dataChange.getCdcTableIdentifier(objectMapper.readTree(ddl));
+ String ddlRes =
JsonDebeziumChangeUtils.getCdcTableIdentifier(objectMapper.readTree(ddl));
Assert.assertEquals("test.t1", ddlRes);
}
@Test
public void testGetDorisTableIdentifier() throws Exception {
- String identifier = dataChange.getDorisTableIdentifier("test.dbo.t1");
+ String identifier =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ "test.dbo.t1", dorisOptions, tableMapping);
Assert.assertEquals("test.t1", identifier);
- identifier = dataChange.getDorisTableIdentifier("test.t1");
+ identifier =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ "test.t1", dorisOptions, tableMapping);
Assert.assertEquals("test.t1", identifier);
String tmp = dorisOptions.getTableIdentifier();
dorisOptions.setTableIdentifier(null);
- identifier = dataChange.getDorisTableIdentifier("test.t1");
+ identifier =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ "test.t1", dorisOptions, tableMapping);
Assert.assertNull(identifier);
dorisOptions.setTableIdentifier(tmp);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 8aca521..4801e88 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -73,16 +73,18 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
String sql3 = "ALTER TABLE `test`.`t1` DROP COLUMN `c13`";
List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);
- Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
- originFiledSchemaMap.put("c2", new FieldSchema());
- originFiledSchemaMap.put("c555", new FieldSchema());
- originFiledSchemaMap.put("c666", new FieldSchema());
- originFiledSchemaMap.put("c4", new FieldSchema());
- originFiledSchemaMap.put("c13", new FieldSchema());
-
- String record =
+ Map<String, Map<String, FieldSchema>> originFiledSchemaMap = new
LinkedHashMap<>();
+ Map<String, FieldSchema> filedSchemaMap = new LinkedHashMap<>();
+ filedSchemaMap.put("c2", new FieldSchema());
+ filedSchemaMap.put("c555", new FieldSchema());
+ filedSchemaMap.put("c666", new FieldSchema());
+ filedSchemaMap.put("c4", new FieldSchema());
+ filedSchemaMap.put("c13", new FieldSchema());
+ originFiledSchemaMap.put("test.t1", filedSchemaMap);
+
+ String record1 =
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
[...]
- JsonNode recordRoot = objectMapper.readTree(record);
+ JsonNode recordRoot = objectMapper.readTree(record1);
schemaChange.setOriginFieldSchemaMap(originFiledSchemaMap);
List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
for (int i = 0; i < ddlSQLList.size(); i++) {
@@ -157,15 +159,18 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
"test_time", new FieldSchema("test_time", "DATETIMEV2(0)",
null, null));
srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null));
+ String tableName = "db.test_fill";
schemaChange.setSourceConnector("mysql");
String columnsString =
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"
[...]
JsonNode columns = objectMapper.readTree(columnsString);
- schemaChange.fillOriginSchema(columns);
- Map<String, FieldSchema> originFieldSchemaMap =
schemaChange.getOriginFieldSchemaMap();
+ schemaChange.fillOriginSchema(tableName, columns);
+ Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+ schemaChange.getOriginFieldSchemaMap();
+ Map<String, FieldSchema> fieldSchemaMap =
originFieldSchemaMap.get(tableName);
Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
- originFieldSchemaMap.entrySet().iterator();
+ fieldSchemaMap.entrySet().iterator();
for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
FieldSchema srcFiledSchema = entry.getValue();
Entry<String, FieldSchema> originField =
originFieldSchemaIterator.next();
@@ -180,6 +185,102 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
}
}
+ @Test
+ public void testMultipleFillOriginSchema() throws IOException {
+ Map<String, Map<String, FieldSchema>> originFiledSchema =
buildOriginFiledSchema();
+ String tableName1 = "db.test_fill";
+ String tableName2 = "test.t1";
+
+ schemaChange.setOriginFieldSchemaMap(originFiledSchema);
+ schemaChange.setSourceConnector("mysql");
+ String columnsString1 =
+
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"
[...]
+ JsonNode columns1 = objectMapper.readTree(columnsString1);
+
+ String columnsString2 =
+
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10000\",\"enumValues\":[]},{\"name\":\"c2\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":nul
[...]
+ JsonNode columns2 = objectMapper.readTree(columnsString2);
+
+ schemaChange.fillOriginSchema(tableName1, columns1);
+ schemaChange.fillOriginSchema(tableName2, columns2);
+ Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+ schemaChange.getOriginFieldSchemaMap();
+
+ Map<String, FieldSchema> originSchema1 =
originFieldSchemaMap.get(tableName1);
+ Map<String, FieldSchema> originSchema2 =
originFieldSchemaMap.get(tableName2);
+
+ Map<String, Map<String, FieldSchema>> scrFiledSchema =
buildSrcFiledSchema();
+ Map<String, FieldSchema> scrSchema1 = scrFiledSchema.get(tableName1);
+ Map<String, FieldSchema> scrSchema2 = scrFiledSchema.get(tableName2);
+
+ compareResults(originSchema1, scrSchema1);
+ compareResults(originSchema2, scrSchema2);
+ }
+
+ private void compareResults(
+ Map<String, FieldSchema> originSchema, Map<String, FieldSchema>
scrSchema) {
+ Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
+ originSchema.entrySet().iterator();
+ for (Entry<String, FieldSchema> srcEntry : scrSchema.entrySet()) {
+ FieldSchema srcFiledSchema = srcEntry.getValue();
+ Entry<String, FieldSchema> originField =
originFieldSchemaIterator.next();
+
+ Assert.assertEquals(srcEntry.getKey(), originField.getKey());
+ Assert.assertEquals(srcFiledSchema.getName(),
originField.getValue().getName());
+ Assert.assertEquals(
+ srcFiledSchema.getTypeString(),
originField.getValue().getTypeString());
+ Assert.assertEquals(
+ srcFiledSchema.getDefaultValue(),
originField.getValue().getDefaultValue());
+ Assert.assertEquals(srcFiledSchema.getComment(),
originField.getValue().getComment());
+ }
+ }
+
+ private Map<String, Map<String, FieldSchema>> buildSrcFiledSchema() {
+ String tab1 = "db.test_fill";
+ String tab2 = "test.t1";
+ Map<String, Map<String, FieldSchema>> scrFiledSchema = new
LinkedHashMap<>();
+ Map<String, FieldSchema> filedSchemaMap1 = new LinkedHashMap<>();
+ filedSchemaMap1.put("id", new FieldSchema("id", "INT", null, null));
+ filedSchemaMap1.put("name", new FieldSchema("name", "VARCHAR(150)",
null, null));
+ filedSchemaMap1.put("test_time", new FieldSchema("test_time",
"DATETIMEV2(0)", null, null));
+ filedSchemaMap1.put("c1", new FieldSchema("c1", "INT", "100", null));
+ scrFiledSchema.put(tab1, filedSchemaMap1);
+
+ Map<String, FieldSchema> filedSchemaMap2 = new LinkedHashMap<>();
+ filedSchemaMap2.put("id", new FieldSchema("id", "INT", "10000", null));
+ filedSchemaMap2.put("c2", new FieldSchema("c2", "INT", null, null));
+ filedSchemaMap2.put("c555", new FieldSchema("c555", "VARCHAR(300)",
null, null));
+ filedSchemaMap2.put("c666", new FieldSchema("c666", "INT", "100",
null));
+ filedSchemaMap2.put("c4", new FieldSchema("c4", "BIGINT", "555",
null));
+ filedSchemaMap2.put("c199", new FieldSchema("c199", "INT", null,
null));
+ filedSchemaMap2.put("c12", new FieldSchema("c12", "INT", "100", null));
+ scrFiledSchema.put(tab2, filedSchemaMap2);
+ return scrFiledSchema;
+ }
+
+ private Map<String, Map<String, FieldSchema>> buildOriginFiledSchema() {
+ String tab1 = "db.test_fill";
+ String tab2 = "test.t1";
+ Map<String, Map<String, FieldSchema>> originFiledSchema = new
LinkedHashMap<>();
+ Map<String, FieldSchema> filedSchemaMap1 = new LinkedHashMap<>();
+ filedSchemaMap1.put("id", new FieldSchema());
+ filedSchemaMap1.put("name", new FieldSchema());
+ filedSchemaMap1.put("test_time", new FieldSchema());
+ filedSchemaMap1.put("c1", new FieldSchema());
+ originFiledSchema.put(tab1, filedSchemaMap1);
+
+ Map<String, FieldSchema> filedSchemaMap2 = new LinkedHashMap<>();
+ filedSchemaMap2.put("id", new FieldSchema());
+ filedSchemaMap2.put("c2", new FieldSchema());
+ filedSchemaMap2.put("c555", new FieldSchema());
+ filedSchemaMap2.put("c666", new FieldSchema());
+ filedSchemaMap2.put("c4", new FieldSchema());
+ filedSchemaMap2.put("c199", new FieldSchema());
+ filedSchemaMap2.put("c12", new FieldSchema());
+ originFiledSchema.put(tab2, filedSchemaMap2);
+ return originFiledSchema;
+ }
+
@Test
public void testBuildMysql2DorisTypeName() throws IOException {
String columnInfo =
@@ -204,14 +305,16 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
public void testExtractDDLListRename() throws IOException {
String columnInfo =
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\
[...]
- Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap();
+ Map<String, Map<String, FieldSchema>> originFieldSchemaHashMap = new
LinkedHashMap<>();
+ Map<String, FieldSchema> fieldSchemaHashMap = Maps.newHashMap();
JsonNode record = objectMapper.readTree(columnInfo);
schemaChange.setSourceConnector("mysql");
- originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
- originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
- originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)",
"", ""));
- schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap);
+ fieldSchemaHashMap.put("id", new FieldSchema("id", "INT", "", ""));
+ fieldSchemaHashMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+ fieldSchemaHashMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "",
""));
+ originFieldSchemaHashMap.put("test.t1", fieldSchemaHashMap);
+ schemaChange.setOriginFieldSchemaMap(originFieldSchemaHashMap);
List<String> ddlList = schemaChange.extractDDLList(record);
Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c3`
`c333`", ddlList.get(0));
@@ -239,15 +342,21 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
@Test
public void testGetDorisTableIdentifier() throws Exception {
- String identifier =
schemaChange.getDorisTableIdentifier("test.dbo.t1");
+ String identifier =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ "test.dbo.t1", dorisOptions, tableMapping);
Assert.assertEquals("test.t1", identifier);
- identifier = schemaChange.getDorisTableIdentifier("test.t1");
+ identifier =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ "test.t1", dorisOptions, tableMapping);
Assert.assertEquals("test.t1", identifier);
String tmp = dorisOptions.getTableIdentifier();
dorisOptions.setTableIdentifier(null);
- identifier = schemaChange.getDorisTableIdentifier("test.t1");
+ identifier =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(
+ "test.t1", dorisOptions, tableMapping);
Assert.assertNull(identifier);
dorisOptions.setTableIdentifier(tmp);
}
@@ -293,15 +402,18 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
"test_ts_6",
new FieldSchema("test_ts_6", "DATETIMEV2(6)",
"current_timestamp", null));
+ String tableName = "db.test_fill";
schemaChange.setSourceConnector("mysql");
String columnsString =
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValu
[...]
JsonNode columns = objectMapper.readTree(columnsString);
- schemaChange.fillOriginSchema(columns);
- Map<String, FieldSchema> originFieldSchemaMap =
schemaChange.getOriginFieldSchemaMap();
+ schemaChange.fillOriginSchema(tableName, columns);
+ Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+ schemaChange.getOriginFieldSchemaMap();
+ Map<String, FieldSchema> fieldSchemaMap =
originFieldSchemaMap.get(tableName);
Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
- originFieldSchemaMap.entrySet().iterator();
+ fieldSchemaMap.entrySet().iterator();
for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
FieldSchema srcFiledSchema = entry.getValue();
Entry<String, FieldSchema> originField =
originFieldSchemaIterator.next();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 3cc9db8..88176e8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -72,6 +72,7 @@ public class CdcMysqlSyncDatabaseCase {
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
+ boolean singleSink = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync
.setEnv(env)
@@ -88,6 +89,7 @@ public class CdcMysqlSyncDatabaseCase {
.setTableConfig(tableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(useNewSchemaChange)
+ .setSingleSink(singleSink)
.create();
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]