This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new dbed8562 [Feature] support auto create table for shard table (#564)
dbed8562 is described below
commit dbed856246f2d035df544381c4d6208516810d0d
Author: wudi <[email protected]>
AuthorDate: Wed Feb 26 10:13:08 2025 +0800
[Feature] support auto create table for shard table (#564)
---
.../flink/catalog/doris/DorisSchemaFactory.java | 6 +-
.../serializer/JsonDebeziumSchemaSerializer.java | 18 +++++-
.../jsondebezium/JsonDebeziumChangeContext.java | 43 +++++++++++++
.../jsondebezium/JsonDebeziumSchemaChange.java | 10 ++-
.../JsonDebeziumSchemaChangeImplV2.java | 1 +
.../jsondebezium/SQLParserSchemaChange.java | 1 +
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 50 +--------------
.../flink/tools/cdc/ParsingProcessFunction.java | 5 +-
.../tools/cdc/converter/TableNameConverter.java | 73 ++++++++++++++++++++++
.../cdc/mongodb/MongoParsingProcessFunction.java | 2 +-
.../TestJsonDebeziumSchemaChangeImplV2.java | 60 ++++++++++++++++++
11 files changed, 212 insertions(+), 57 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
index dd42f803..07b05e5e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
@@ -255,10 +255,12 @@ public class DorisSchemaFactory {
}
public static String quoteDefaultValue(String defaultValue) {
- // DEFAULT current_timestamp not need quote
- if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+ // DEFAULT current_timestamp or null not need quote
+ if (defaultValue.equalsIgnoreCase("current_timestamp")
+ || defaultValue.equalsIgnoreCase("null")) {
return defaultValue;
}
+
return "'" + defaultValue + "'";
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 9c89fce3..93b7efcd 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -38,6 +38,7 @@ import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSc
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.SQLParserSchemaChange;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +82,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private JsonDebeziumDataChange dataChange;
private JsonDebeziumSchemaChange schemaChange;
private SchemaChangeMode schemaChangeMode;
+ private TableNameConverter tableNameConverter;
private final Set<String> initTableSet = new HashSet<>();
public JsonDebeziumSchemaSerializer(
@@ -127,7 +129,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
String targetDatabase,
String targetTablePrefix,
String targetTableSuffix,
- SchemaChangeMode schemaChangeMode) {
+ SchemaChangeMode schemaChangeMode,
+ TableNameConverter tableNameConverter) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange,
executionOptions);
this.tableMapping = tableMapping;
this.targetDatabase = targetDatabase;
@@ -135,6 +138,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
this.targetTableSuffix = targetTableSuffix;
this.schemaChangeMode = schemaChangeMode;
this.dorisTableConfig = dorisTableConfig;
+ this.tableNameConverter = tableNameConverter;
init();
}
@@ -152,7 +156,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
ignoreUpdateBefore,
targetTablePrefix,
targetTableSuffix,
- enableDelete);
+ enableDelete,
+ tableNameConverter);
initSchemaChangeInstance(changeContext);
this.dataChange = new JsonDebeziumDataChange(changeContext);
}
@@ -233,6 +238,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private String targetDatabase;
private String targetTablePrefix = "";
private String targetTableSuffix = "";
+ private TableNameConverter tableNameConverter;
public JsonDebeziumSchemaSerializer.Builder
setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
@@ -302,6 +308,11 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return this;
}
+ public Builder setTableNameConverter(TableNameConverter
tableNameConverter) {
+ this.tableNameConverter = tableNameConverter;
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(
dorisOptions,
@@ -314,7 +325,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
targetDatabase,
targetTablePrefix,
targetTableSuffix,
- schemaChangeMode);
+ schemaChangeMode,
+ tableNameConverter);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index 2f7764f3..c9811189 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -17,9 +17,12 @@
package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+import org.apache.flink.annotation.VisibleForTesting;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import java.io.Serializable;
import java.util.HashMap;
@@ -44,6 +47,7 @@ public class JsonDebeziumChangeContext implements
Serializable {
private final boolean enableDelete;
private final String targetTablePrefix;
private final String targetTableSuffix;
+ private TableNameConverter tableNameConverter;
public JsonDebeziumChangeContext(
DorisOptions dorisOptions,
@@ -72,6 +76,36 @@ public class JsonDebeziumChangeContext implements
Serializable {
this.targetTableSuffix = targetTableSuffix;
}
+ public JsonDebeziumChangeContext(
+ DorisOptions dorisOptions,
+ Map<String, String> tableMapping,
+ String sourceTableName,
+ String targetDatabase,
+ DorisTableConfig dorisTableConfig,
+ ObjectMapper objectMapper,
+ Pattern pattern,
+ String lineDelimiter,
+ boolean ignoreUpdateBefore,
+ String targetTablePrefix,
+ String targetTableSuffix,
+ boolean enableDelete,
+ TableNameConverter tableNameConverter) {
+ this(
+ dorisOptions,
+ tableMapping,
+ sourceTableName,
+ targetDatabase,
+ dorisTableConfig,
+ objectMapper,
+ pattern,
+ lineDelimiter,
+ ignoreUpdateBefore,
+ targetTablePrefix,
+ targetTableSuffix,
+ enableDelete);
+ this.tableNameConverter = tableNameConverter;
+ }
+
public DorisOptions getDorisOptions() {
return dorisOptions;
}
@@ -119,6 +153,10 @@ public class JsonDebeziumChangeContext implements
Serializable {
return targetTableSuffix;
}
+ public TableNameConverter getTableNameConverter() {
+ return tableNameConverter;
+ }
+
public boolean enableDelete() {
return enableDelete;
}
@@ -126,4 +164,9 @@ public class JsonDebeziumChangeContext implements
Serializable {
public DorisTableConfig getDorisTableConf() {
return dorisTableConfig;
}
+
+ @VisibleForTesting
+ public void setTableNameConverter(TableNameConverter tableNameConverter) {
+ this.tableNameConverter = tableNameConverter;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index 91be21fa..16757d27 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.writer.EventType;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +72,7 @@ public abstract class JsonDebeziumSchemaChange extends
CdcSchemaChange {
protected String targetTablePrefix;
protected String targetTableSuffix;
protected DorisTableConfig dorisTableConfig;
+ protected TableNameConverter tableNameConverter;
public abstract boolean schemaChange(JsonNode recordRoot);
@@ -196,7 +198,13 @@ public abstract class JsonDebeziumSchemaChange extends
CdcSchemaChange {
protected String getCreateTableIdentifier(JsonNode record) {
String table = extractJsonNode(record.get("source"), "table");
- return targetDatabase + "." + targetTablePrefix + table +
targetTableSuffix;
+ String createTblName;
+ if (tableNameConverter != null) {
+ createTblName = tableNameConverter.convert(table);
+ } else {
+ createTblName = targetTablePrefix + table + targetTableSuffix;
+ }
+ return targetDatabase + "." + createTblName;
}
public Map<String, String> getTableMapping() {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 0b9172e8..abd6f55b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -88,6 +88,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
changeContext.getTargetTableSuffix() == null
? ""
: changeContext.getTargetTableSuffix();
+ this.tableNameConverter = changeContext.getTableNameConverter();
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index fd10ba53..eda223d6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -53,6 +53,7 @@ public class SQLParserSchemaChange extends
JsonDebeziumSchemaChange {
changeContext.getTargetTableSuffix() == null
? ""
: changeContext.getTargetTableSuffix();
+ this.tableNameConverter = changeContext.getTableNameConverter();
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index d6c69c0b..06e49e24 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -42,11 +42,11 @@ import org.apache.doris.flink.sink.writer.WriteMode;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.table.DorisConfigOptions;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
@@ -343,6 +343,7 @@ public abstract class DatabaseSync {
.setTargetDatabase(database)
.setTargetTablePrefix(tablePrefix)
.setTargetTableSuffix(tableSuffix)
+ .setTableNameConverter(converter)
.build();
}
@@ -618,51 +619,4 @@ public abstract class DatabaseSync {
this.tableSuffix = tableSuffix;
return this;
}
-
- public static class TableNameConverter implements Serializable {
- private static final long serialVersionUID = 1L;
- private final String prefix;
- private final String suffix;
- private Map<Pattern, String> multiToOneRulesPattern;
-
- TableNameConverter() {
- this("", "");
- }
-
- TableNameConverter(String prefix, String suffix) {
- this.prefix = prefix == null ? "" : prefix;
- this.suffix = suffix == null ? "" : suffix;
- }
-
- TableNameConverter(
- String prefix, String suffix, Map<Pattern, String>
multiToOneRulesPattern) {
- this.prefix = prefix == null ? "" : prefix;
- this.suffix = suffix == null ? "" : suffix;
- this.multiToOneRulesPattern = multiToOneRulesPattern;
- }
-
- public String convert(String tableName) {
- if (multiToOneRulesPattern == null) {
- return prefix + tableName + suffix;
- }
-
- String target = null;
-
- for (Map.Entry<Pattern, String> patternStringEntry :
- multiToOneRulesPattern.entrySet()) {
- if (patternStringEntry.getKey().matcher(tableName).matches()) {
- target = patternStringEntry.getValue();
- }
- }
- /**
- * If multiToOneRulesPattern is not null and target is not
assigned, then the
- * synchronization task contains both multi to one and one to one
, prefixes and
- * suffixes are added to common one-to-one mapping tables
- */
- if (target == null) {
- return prefix + tableName + suffix;
- }
- return target;
- }
- }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 22e2b9bc..8315f2da 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import java.util.HashMap;
import java.util.Map;
@@ -33,10 +34,10 @@ import java.util.Map;
public class ParsingProcessFunction extends ProcessFunction<String, Void> {
protected ObjectMapper objectMapper = new ObjectMapper();
private transient Map<String, OutputTag<String>> recordOutputTags;
- private DatabaseSync.TableNameConverter converter;
+ private TableNameConverter converter;
private String database;
- public ParsingProcessFunction(String database,
DatabaseSync.TableNameConverter converter) {
+ public ParsingProcessFunction(String database, TableNameConverter
converter) {
this.database = database;
this.converter = converter;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java
new file mode 100644
index 00000000..a9df452c
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.converter;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/*
+ * Convert the table name of the upstream data source to the table name of the
doris database.
+ * */
+public class TableNameConverter implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final String prefix;
+ private final String suffix;
+
+ // tbl_.*, tbl
+ private Map<Pattern, String> routeRules;
+
+ public TableNameConverter() {
+ this("", "");
+ }
+
+ public TableNameConverter(String prefix, String suffix) {
+ this.prefix = prefix == null ? "" : prefix;
+ this.suffix = suffix == null ? "" : suffix;
+ }
+
+ public TableNameConverter(String prefix, String suffix, Map<Pattern,
String> routeRules) {
+ this.prefix = prefix == null ? "" : prefix;
+ this.suffix = suffix == null ? "" : suffix;
+ this.routeRules = routeRules;
+ }
+
+ public String convert(String tableName) {
+ if (routeRules == null) {
+ return prefix + tableName + suffix;
+ }
+
+ String target = null;
+
+ for (Map.Entry<Pattern, String> patternStringEntry :
routeRules.entrySet()) {
+ if (patternStringEntry.getKey().matcher(tableName).matches()) {
+ target = patternStringEntry.getValue();
+ }
+ }
+ /**
+ * If routeRules is not null and target is not assigned, then the
synchronization task
+ * contains both multi to one and one to one , prefixes and suffixes
are added to common
+ * one-to-one mapping tables
+ */
+ if (target == null) {
+ return prefix + tableName + suffix;
+ }
+ return target;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
index 72c61567..704af781 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
@@ -19,8 +19,8 @@ package org.apache.doris.flink.tools.cdc.mongodb;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
-import org.apache.doris.flink.tools.cdc.DatabaseSync.TableNameConverter;
import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 39241f94..fc3f6ffb 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -28,6 +28,7 @@ import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -42,6 +43,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.regex.Pattern;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mockStatic;
@@ -50,7 +52,10 @@ import static org.mockito.Mockito.mockStatic;
public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBase {
private JsonDebeziumSchemaChangeImplV2 schemaChange;
+ private JsonDebeziumSchemaChangeImplV2 schemaChangeWithConvert;
private JsonDebeziumChangeContext changeContext;
+ private JsonDebeziumChangeContext convertContext;
+
private MockedStatic<RestService> mockRestService;
@Before
@@ -86,6 +91,23 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
"",
true);
schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
+
+ convertContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ sourceTableName,
+ targetDatabase,
+ new DorisTableConfig(tableProperties),
+ objectMapper,
+ null,
+ lineDelimiter,
+ ignoreUpdateBefore,
+ "ods_",
+ "_dt",
+ true,
+ new TableNameConverter("ods_", "_dt"));
+ schemaChangeWithConvert = new
JsonDebeziumSchemaChangeImplV2(convertContext);
}
@Test
@@ -489,6 +511,44 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
}
+ @Test
+ public void testAutoCreateTableWithConvert() throws Exception {
+ String record =
+ "{ \"source\":{ \"version\":\"1.9.7.Final\",
\"connector\":\"oracle\", \"name\":\"oracle_logminer\",
\"ts_ms\":1696945825065, \"snapshot\":\"true\",
\"db\":\"TESTDB\", \"sequence\":null, \"schema\":\"ADMIN\",
\"table\":\"PERSONS\", \"txId\":null, \"scn\":\"1199617\",
\"commit_scn\":null, \"lcr_position\":null, \"rs_id\":null,
\"ssn\":0, \"redo_thread\":null [...]
+ JsonNode recordRoot = objectMapper.readTree(record);
+
schemaChangeWithConvert.setSourceConnector(SourceConnector.ORACLE.connectorName);
+ TableSchema tableSchema =
schemaChangeWithConvert.extractCreateTableSchema(recordRoot);
+ Assert.assertEquals("TESTDB", tableSchema.getDatabase());
+ Assert.assertEquals("ods_PERSONS_dt", tableSchema.getTable());
+ Assert.assertArrayEquals(new String[] {"ID"},
tableSchema.getKeys().toArray());
+ Assert.assertEquals(3, tableSchema.getFields().size());
+ Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName());
+ Assert.assertEquals("NAME4",
tableSchema.getFields().get("NAME4").getName());
+ Assert.assertEquals("age4",
tableSchema.getFields().get("age4").getName());
+
+ // match table
+ Map<Pattern, String> tableConvert = new HashMap<>();
+ tableConvert.put(Pattern.compile("PER.*"), "PERSONS_RES");
+
+ convertContext.setTableNameConverter(
+ new TableNameConverter("prefix_", "_suffix", tableConvert));
+ schemaChangeWithConvert = new
JsonDebeziumSchemaChangeImplV2(convertContext);
+ TableSchema tableSchema2 =
schemaChangeWithConvert.extractCreateTableSchema(recordRoot);
+ Assert.assertEquals("TESTDB", tableSchema2.getDatabase());
+ Assert.assertEquals("PERSONS_RES", tableSchema2.getTable());
+
+ // no match table
+ Map<Pattern, String> tableConvert2 = new HashMap<>();
+ tableConvert2.put(Pattern.compile("NOMATCH.*"), "PERSONS_RES");
+ convertContext.setTableNameConverter(new TableNameConverter("pre_",
"_suf", tableConvert2));
+ schemaChangeWithConvert = new
JsonDebeziumSchemaChangeImplV2(convertContext);
+ TableSchema tableSchema3 =
schemaChangeWithConvert.extractCreateTableSchema(recordRoot);
+ Assert.assertEquals("TESTDB", tableSchema3.getDatabase());
+ Assert.assertEquals("pre_PERSONS_suf", tableSchema3.getTable());
+
+
schemaChangeWithConvert.setSourceConnector(SourceConnector.MYSQL.connectorName);
+ }
+
@Test
public void testDateTimeFullOrigin() throws JsonProcessingException {
Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new
LinkedHashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]