This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 734662e [Feature](CDC)Support database sync use single sink (#245)
734662e is described below
commit 734662e18339965ab522957729c97cebc655cfe1
Author: wudi <[email protected]>
AuthorDate: Wed Nov 29 14:37:21 2023 +0800
[Feature](CDC)Support database sync use single sink (#245)
---
flink-doris-connector/pom.xml | 2 +-
.../flink/sink/writer/serializer/DorisRecord.java | 12 ++
.../serializer/JsonDebeziumSchemaSerializer.java | 120 +++++++++++++++---
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 17 ++-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 140 ++++++++++++++++-----
.../cdc/{mysql => }/ParsingProcessFunction.java | 3 +-
.../apache/doris/flink/tools/cdc/SourceSchema.java | 24 ++++
.../writer/TestJsonDebeziumSchemaSerializer.java | 58 +++++++++
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 16 ++-
.../tools/cdc/CdcOraclelSyncDatabaseCase.java | 16 ++-
.../tools/cdc/CdcPostgresSyncDatabaseCase.java | 16 ++-
.../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 16 ++-
12 files changed, 387 insertions(+), 53 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 13a7ec4..54dc7ba 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -70,7 +70,7 @@ under the License.
<revision>1.5.0-SNAPSHOT</revision>
<flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version>
- <flink.sql.cdc.version>2.4.1</flink.sql.cdc.version>
+ <flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
index 02ae3bc..fc2f00c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
@@ -67,6 +67,18 @@ public class DorisRecord implements Serializable {
return new DorisRecord(database, table, row);
}
+ public static DorisRecord of(String tableIdentifier, byte[] row) {
+ if(tableIdentifier != null) {
+ String[] dbTbl = tableIdentifier.split("\\.");
+ if(dbTbl.length == 2){
+ String database = dbTbl[0];
+ String table = dbTbl[1];
+ return new DorisRecord(database, table, row);
+ }
+ }
+ return null;
+ }
+
public static DorisRecord of(byte[] row) {
return new DorisRecord(null, null, row);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 4dc654e..d87da4c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -33,12 +33,14 @@ import
org.apache.doris.flink.sink.schema.SchemaChangeHelper;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +88,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private boolean ignoreUpdateBefore = true;
private SourceConnector sourceConnector;
private SchemaChangeManager schemaChangeManager;
+ // <cdc db.schema.table, doris db.table>
+ private Map<String, String> tableMapping;
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
@@ -93,9 +97,11 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ?
Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
- String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
- this.database = tableInfo[0];
- this.table = tableInfo[1];
+
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
+ String[] tableInfo =
dorisOptions.getTableIdentifier().split("\\.");
+ this.database = tableInfo[0];
+ this.table = tableInfo[1];
+ }
this.sourceTableName = sourceTableName;
// Prevent loss of decimal data precision
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
@@ -120,10 +126,29 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
}
}
+ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
+ Pattern pattern,
+ String sourceTableName,
+ boolean newSchemaChange,
+ DorisExecutionOptions executionOptions,
+ Map<String, String> tableMapping) {
+ this(dorisOptions, pattern, sourceTableName, newSchemaChange,
executionOptions);
+ this.tableMapping = tableMapping;
+ }
+
@Override
public DorisRecord serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+
+ //Filter out table records that are not in tableMapping
+ String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier =
getDorisTableIdentifier(cdcTableIdentifier);
+ if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
+ LOG.warn("filter table {}, because it is not listened, record
detail is {}", cdcTableIdentifier, record);
+ return null;
+ }
+
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
@@ -146,7 +171,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
addDeleteSign(valueMap, false);
break;
case OP_UPDATE:
- return DorisRecord.of(extractUpdate(recordRoot));
+ return DorisRecord.of(dorisTableIdentifier,
extractUpdate(recordRoot));
case OP_DELETE:
valueMap = extractBeforeRow(recordRoot);
addDeleteSign(valueMap, true);
@@ -155,7 +180,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
LOG.error("parse record fail, unknown op {} in {}", op,
record);
return null;
}
- return
DorisRecord.of(objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
+
+ return DorisRecord.of(dorisTableIdentifier,
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
}
/**
@@ -187,6 +213,13 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) &&
!checkTable(recordRoot)) {
return false;
}
+
+ // db,table
+ Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+ if(tuple == null){
+ return false;
+ }
+
List<String> ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
@@ -197,8 +230,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
- boolean doSchemaChange = checkSchemaChange(ddlSchema);
- status = doSchemaChange && schemaChangeManager.execute(ddlSql,
database);
+ boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1,
ddlSchema);
+ status = doSchemaChange && schemaChangeManager.execute(ddlSql,
tuple.f0);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} catch (Exception ex) {
@@ -209,6 +242,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws
JsonProcessingException {
+ String dorisTable = getDorisTableIdentifier(record);
JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
String ddl = extractJsonNode(historyRecord, "ddl");
@@ -233,7 +267,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
String oldColumnName = renameMatcher.group(2);
String newColumnName = renameMatcher.group(3);
return SchemaChangeHelper.generateRenameDDLSql(
- dorisOptions.getTableIdentifier(), oldColumnName,
newColumnName, originFieldSchemaMap);
+ dorisTable, oldColumnName, newColumnName,
originFieldSchemaMap);
}
// add/drop ddl
@@ -248,7 +282,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
if (!matcher.find()) {
return null;
}
- return
SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
+ return SchemaChangeHelper.generateDDLSql(dorisTable);
}
@VisibleForTesting
@@ -262,13 +296,20 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) &&
!checkTable(recordRoot)) {
return false;
}
+ // db,table
+ Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+ if(tuple == null){
+ return false;
+ }
+
String ddl = extractDDL(recordRoot);
if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
- boolean doSchemaChange = checkSchemaChange(ddl);
- status = doSchemaChange && schemaChangeManager.execute(ddl,
database);
+
+ boolean doSchemaChange = checkSchemaChange(ddl, tuple.f0,
tuple.f1);
+ status = doSchemaChange && schemaChangeManager.execute(ddl,
tuple.f0);
LOG.info("schema change status:{}", status);
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
@@ -286,12 +327,48 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return sourceTableName.equals(dbTbl);
}
- private boolean checkSchemaChange(String ddl) throws IOException,
IllegalArgumentException {
+ public String getCdcTableIdentifier(JsonNode record){
+ String db = extractJsonNode(record.get("source"), "db");
+ String schema = extractJsonNode(record.get("source"), "schema");
+ String table = extractJsonNode(record.get("source"), "table");
+ return SourceSchema.getString(db, schema, table);
+ }
+
+ public String getDorisTableIdentifier(String cdcTableIdentifier){
+
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
+ return dorisOptions.getTableIdentifier();
+ }
+ if(!CollectionUtil.isNullOrEmpty(tableMapping)
+ && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+ && tableMapping.get(cdcTableIdentifier) != null){
+ return tableMapping.get(cdcTableIdentifier);
+ }
+ return null;
+ }
+
+ protected String getDorisTableIdentifier(JsonNode record){
+ String identifier = getCdcTableIdentifier(record);
+ return getDorisTableIdentifier(identifier);
+ }
+
+ protected Tuple2<String, String> getDorisTableTuple(JsonNode record){
+ String identifier = getDorisTableIdentifier(record);
+ if(StringUtils.isNullOrWhitespaceOnly(identifier)){
+ return null;
+ }
+ String[] tableInfo = identifier.split("\\.");
+ if(tableInfo.length != 2){
+ return null;
+ }
+ return Tuple2.of(tableInfo[0], tableInfo[1]);
+ }
+
+ private boolean checkSchemaChange(String database, String table, String
ddl) throws IOException, IllegalArgumentException {
Map<String, Object> param = buildRequestParam(ddl);
return schemaChangeManager.checkSchemaChange(database, table, param);
}
- private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException,
IllegalArgumentException {
+ private boolean checkSchemaChange(String database, String table, DDLSchema
ddlSchema) throws IOException, IllegalArgumentException {
Map<String, Object> param =
SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(),
ddlSchema.getColumnName());
return schemaChangeManager.checkSchemaChange(database, table, param);
}
@@ -328,8 +405,6 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return extractJsonNode(record.get("source"), "table");
}
-
-
private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null &&
!(record.get(key) instanceof NullNode) ?
record.get(key).asText() : null;
@@ -369,7 +444,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
String col = matcher.group(3);
String type = matcher.group(5);
type = handleType(type);
- ddl = String.format(EXECUTE_DDL,
dorisOptions.getTableIdentifier(), op, col, type);
+ ddl = String.format(EXECUTE_DDL,
getDorisTableIdentifier(record), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
}
@@ -475,6 +550,11 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
this.sourceConnector =
SourceConnector.valueOf(sourceConnector.toUpperCase());
}
+ @VisibleForTesting
+ public void setTableMapping(Map<String, String> tableMapping) {
+ this.tableMapping = tableMapping;
+ }
+
public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
@@ -488,6 +568,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private String sourceTableName;
private boolean newSchemaChange;
private DorisExecutionOptions executionOptions;
+ private Map<String, String> tableMapping;
public JsonDebeziumSchemaSerializer.Builder
setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
@@ -514,9 +595,14 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return this;
}
+ public Builder setTableMapping(Map<String, String> tableMapping) {
+ this.tableMapping = tableMapping;
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions,
addDropDDLPattern, sourceTableName, newSchemaChange,
- executionOptions);
+ executionOptions, tableMapping);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 8a8b3db..d05fa15 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -108,13 +108,28 @@ public class CdcTools {
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean useNewSchemaChange = params.has("use-new-schema-change");
+ boolean singleSink = params.has("single-sink");
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- databaseSync.create(env, database, config, tablePrefix, tableSuffix,
includingTables, excludingTables,multiToOneOrigin,multiToOneTarget,
ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConfig)
+ .setTableConfig(tableMap)
+ .setCreateTableOnly(createTableOnly)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
jobName = String.format("%s-Doris Sync Database: %s", type,
config.getString("database-name","db"));
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index f1579b0..4dfa6d5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -25,7 +25,6 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.table.DorisConfigOptions;
-import org.apache.doris.flink.tools.cdc.mysql.ParsingProcessFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -60,7 +59,7 @@ public abstract class DatabaseSync {
protected Pattern includingPattern;
protected Pattern excludingPattern;
protected Map<Pattern, String> multiToOneRulesPattern;
- protected Map<String, String> tableConfig;
+ protected Map<String, String> tableConfig = new HashMap<>();
protected Configuration sinkConfig;
protected boolean ignoreDefaultValue;
@@ -71,6 +70,10 @@ public abstract class DatabaseSync {
protected String excludingTables;
protected String multiToOneOrigin;
protected String multiToOneTarget;
+ protected String tablePrefix;
+ protected String tableSuffix;
+ protected boolean singleSink;
+ private Map<String, String> tableMapping = new HashMap<>();
public abstract void registerDriver() throws SQLException;
@@ -84,30 +87,15 @@ public abstract class DatabaseSync {
registerDriver();
}
- public void create(StreamExecutionEnvironment env, String database,
Configuration config,
- String tablePrefix, String tableSuffix, String
includingTables,
- String excludingTables,String multiToOneOrigin,String
multiToOneTarget, boolean ignoreDefaultValue, Configuration sinkConfig,
- Map<String, String> tableConfig, boolean createTableOnly, boolean
useNewSchemaChange) {
- this.env = env;
- this.config = config;
- this.database = database;
- this.includingTables = includingTables;
- this.excludingTables = excludingTables;
- this.multiToOneOrigin = multiToOneOrigin;
- this.multiToOneTarget = multiToOneTarget;
+ public void create() {
this.includingPattern = includingTables == null ? null :
Pattern.compile(includingTables);
this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
this.multiToOneRulesPattern =
multiToOneRulesParser(multiToOneOrigin,multiToOneTarget);
this.converter = new TableNameConverter(tablePrefix,
tableSuffix,multiToOneRulesPattern);
- this.ignoreDefaultValue = ignoreDefaultValue;
- this.sinkConfig = sinkConfig;
- this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
//default enable light schema change
if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){
this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
}
- this.createTableOnly = createTableOnly;
- this.newSchemaChange = useNewSchemaChange;
}
public void build() throws Exception {
@@ -125,7 +113,10 @@ public abstract class DatabaseSync {
List<String> dorisTables = new ArrayList<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
- String dorisTable=converter.convert(schema.getTableName());
+ String dorisTable = converter.convert(schema.getTableName());
+
+ //Calculate the mapping relationship between upstream and
downstream tables
+ tableMapping.put(schema.getTableIdentifier(),
String.format("%s.%s", database, dorisTable));
if (!dorisSystem.tableExists(database, dorisTable)) {
TableSchema dorisSchema =
schema.convertTableSchema(tableConfig);
//set doris target database
@@ -144,12 +135,16 @@ public abstract class DatabaseSync {
config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|",
syncTables) + ")");
DataStreamSource<String> streamSource = buildCdcSource(env);
- SingleOutputStreamOperator<Void> parsedStream =
streamSource.process(new ParsingProcessFunction(converter));
- for (String table : dorisTables) {
- OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(table);
- DataStream<String> sideOutput =
parsedStream.getSideOutput(recordOutputTag);
- int sinkParallel =
sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM,
sideOutput.getParallelism());
-
sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
+ if(singleSink){
+ streamSource.sinkTo(buildDorisSink());
+ }else{
+ SingleOutputStreamOperator<Void> parsedStream =
streamSource.process(new ParsingProcessFunction(converter));
+ for (String table : dorisTables) {
+ OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(table);
+ DataStream<String> sideOutput =
parsedStream.getSideOutput(recordOutputTag);
+ int sinkParallel =
sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM,
sideOutput.getParallelism());
+
sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
+ }
}
}
@@ -171,6 +166,13 @@ public abstract class DatabaseSync {
return builder.build();
}
+ /**
+ * create doris sink for multi table
+ */
+ public DorisSink<String> buildDorisSink(){
+ return buildDorisSink(null);
+ }
+
/**
* create doris sink
*/
@@ -179,17 +181,20 @@ public abstract class DatabaseSync {
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
- String labelPrefix =
sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(fenodes)
.setBenodes(benodes)
- .setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd);
sinkConfig.getOptional(DorisConfigOptions.AUTO_REDIRECT).ifPresent(dorisBuilder::setAutoRedirect);
+ //single sink not need table identifier
+ if(!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)){
+ dorisBuilder.setTableIdentifier(database + "." + table);
+ }
+
Properties pro = new Properties();
//default json data format
pro.setProperty("format", "json");
@@ -198,9 +203,9 @@ public abstract class DatabaseSync {
Properties streamLoadProp =
DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
pro.putAll(streamLoadProp);
DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder()
- .setLabelPrefix(String.join("-", labelPrefix, database, table))
.setStreamLoadProp(pro);
+
sinkConfig.getOptional(DorisConfigOptions.SINK_LABEL_PREFIX).ifPresent(executionBuilder::setLabelPrefix);
sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize);
@@ -234,6 +239,7 @@ public abstract class DatabaseSync {
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
+ .setTableMapping(tableMapping)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
@@ -278,6 +284,82 @@ public abstract class DatabaseSync {
return multiToOneRulesPattern;
}
+
+ public DatabaseSync setEnv(StreamExecutionEnvironment env) {
+ this.env = env;
+ return this;
+ }
+
+ public DatabaseSync setConfig(Configuration config) {
+ this.config = config;
+ return this;
+ }
+
+ public DatabaseSync setDatabase(String database) {
+ this.database = database;
+ return this;
+ }
+
+ public DatabaseSync setIncludingTables(String includingTables) {
+ this.includingTables = includingTables;
+ return this;
+ }
+
+ public DatabaseSync setExcludingTables(String excludingTables) {
+ this.excludingTables = excludingTables;
+ return this;
+ }
+
+ public DatabaseSync setMultiToOneOrigin(String multiToOneOrigin) {
+ this.multiToOneOrigin = multiToOneOrigin;
+ return this;
+ }
+
+ public DatabaseSync setMultiToOneTarget(String multiToOneTarget) {
+ this.multiToOneTarget = multiToOneTarget;
+ return this;
+ }
+
+ public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
+ this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public DatabaseSync setSinkConfig(Configuration sinkConfig) {
+ this.sinkConfig = sinkConfig;
+ return this;
+ }
+
+ public DatabaseSync setIgnoreDefaultValue(boolean ignoreDefaultValue) {
+ this.ignoreDefaultValue = ignoreDefaultValue;
+ return this;
+ }
+
+ public DatabaseSync setCreateTableOnly(boolean createTableOnly) {
+ this.createTableOnly = createTableOnly;
+ return this;
+ }
+
+ public DatabaseSync setNewSchemaChange(boolean newSchemaChange) {
+ this.newSchemaChange = newSchemaChange;
+ return this;
+ }
+
+ public DatabaseSync setSingleSink(boolean singleSink) {
+ this.singleSink = singleSink;
+ return this;
+ }
+
+ public DatabaseSync setTablePrefix(String tablePrefix) {
+ this.tablePrefix = tablePrefix;
+ return this;
+ }
+
+ public DatabaseSync setTableSuffix(String tableSuffix) {
+ this.tableSuffix = tableSuffix;
+ return this;
+ }
+
public static class TableNameConverter implements Serializable {
private static final long serialVersionUID = 1L;
private final String prefix;
@@ -322,4 +404,6 @@ public abstract class DatabaseSync {
return target;
}
}
+
+
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
similarity index 96%
rename from
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
rename to
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 563c848..a1c17f6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -14,11 +14,10 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.tools.cdc.mysql;
+package org.apache.doris.flink.tools.cdc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 9168cb5..51d8013 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.tools.cdc;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.flink.util.StringUtils;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
@@ -27,9 +28,11 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.StringJoiner;
public abstract class SourceSchema {
private final String databaseName;
+ private final String schemaName;
private final String tableName;
private final String tableComment;
private final LinkedHashMap<String, FieldSchema> fields;
@@ -40,6 +43,7 @@ public abstract class SourceSchema {
DatabaseMetaData metaData, String databaseName, String schemaName,
String tableName, String tableComment)
throws Exception {
this.databaseName = databaseName;
+ this.schemaName = schemaName;
this.tableName = tableName;
this.tableComment = tableComment;
@@ -74,6 +78,26 @@ public abstract class SourceSchema {
public abstract String convertToDorisType(String fieldType, Integer
precision, Integer scale);
+ public String getTableIdentifier(){
+ return getString(databaseName, schemaName, tableName);
+ }
+
+ public static String getString(String databaseName, String schemaName,
String tableName) {
+ StringJoiner identifier = new StringJoiner(".");
+ if(!StringUtils.isNullOrWhitespaceOnly(databaseName)){
+ identifier.add(databaseName);
+ }
+ if(!StringUtils.isNullOrWhitespaceOnly(schemaName)){
+ identifier.add(schemaName);
+ }
+
+ if(!StringUtils.isNullOrWhitespaceOnly(tableName)){
+ identifier.add(tableName);
+ }
+
+ return identifier.toString();
+ }
+
public TableSchema convertTableSchema(Map<String, String> tableProps) {
TableSchema tableSchema = new TableSchema();
tableSchema.setModel(this.model);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index bc037a8..de549ef 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -361,4 +362,61 @@ public class TestJsonDebeziumSchemaSerializer {
}
return targetField;
}
+
+ @Test
+ public void testGetCdcTableIdentifier() throws Exception {
+ String insert =
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
10:01:02\",\"ts\":\"2022-01-01
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\
[...]
+ JsonNode recordRoot = objectMapper.readTree(insert);
+ String identifier = serializer.getCdcTableIdentifier(recordRoot);
+ Assert.assertEquals( "test.t1", identifier);
+
+ String insertSchema =
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
10:01:02\",\"ts\":\"2022-01-01
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\
[...]
+ String identifierSchema =
serializer.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+ Assert.assertEquals( "test.dbo.t1", identifierSchema);
+
+ String ddl =
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"
[...]
+ String ddlRes =
serializer.getCdcTableIdentifier(objectMapper.readTree(ddl));
+ Assert.assertEquals( "test.t1", ddlRes);
+ }
+
+ @Test
+ public void testGetDorisTableIdentifier() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put("test.dbo.t1", "test.t1");
+ serializer.setTableMapping(map);
+ String identifier = serializer.getDorisTableIdentifier("test.dbo.t1");
+ Assert.assertEquals( "test.t1", identifier);
+
+ identifier = serializer.getDorisTableIdentifier("test.t1");
+ Assert.assertEquals("test.t1", identifier);
+
+ String tmp = dorisOptions.getTableIdentifier();
+ dorisOptions.setTableIdentifier(null);
+ identifier = serializer.getDorisTableIdentifier("test.t1");
+ Assert.assertNull( identifier);
+ dorisOptions.setTableIdentifier(tmp);
+ }
+
+ @Test
+ public void testSchemaChangeMultiTable() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put("mysql.t1", "doris.t1");
+ map.put("mysql.t2", "doris.t2");
+ serializer.setTableMapping(map);
+ String tmp = dorisOptions.getTableIdentifier();
+ dorisOptions.setTableIdentifier(null);
+ String ddl1 =
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\
[...]
+ String ddl2 =
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\
[...]
+ String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)";
+ String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)";
+
+ Assert.assertEquals(exceptDDL1,
serializer.extractDDL(objectMapper.readTree(ddl1)));
+ Assert.assertEquals(exceptDDL2,
serializer.extractDDL(objectMapper.readTree(ddl2)));
+
+ //Assert.assertEquals(exceptDDL1,
serializer.extractDDLList(objectMapper.readTree(ddl1)));
+ //Assert.assertEquals(exceptDDL2,
serializer.extractDDLList(objectMapper.readTree(ddl2)));
+
+ dorisOptions.setTableIdentifier(tmp);
+ }
+
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 875fb4c..f0493a9 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -72,7 +72,21 @@ public class CdcMysqlSyncDatabaseCase {
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
-
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
false, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 9b6277f..6140138 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -76,7 +76,21 @@ public class CdcOraclelSyncDatabaseCase {
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new OracleDatabaseSync();
-
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
false, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
env.execute(String.format("Oracle-Doris Database Sync: %s", database));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 87fa871..4887f36 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -79,7 +79,21 @@ public class CdcPostgresSyncDatabaseCase {
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new PostgresDatabaseSync();
-
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
false, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
env.execute(String.format("Postgres-Doris Database Sync: %s",
database));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index d247500..7129e77 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -77,7 +77,21 @@ public class CdcSqlServerSyncDatabaseCase {
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new SqlServerDatabaseSync();
-
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
false, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
env.execute(String.format("SqlServer-Doris Database Sync: %s",
database));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]