This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 060e13da [feature](cdc) add ignore-incompatible option (#371)
060e13da is described below
commit 060e13dad0b7a7c94fee1c397739f0df29da1e2b
Author: Petrichor <[email protected]>
AuthorDate: Wed Jul 3 15:40:26 2024 +0800
[feature](cdc) add ignore-incompatible option (#371)
---
.../doris/flink/catalog/doris/DorisSystem.java | 3 +-
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 2 +
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 65 ++++++++++++++++++----
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 2 +
.../tools/cdc/CdcOraclelSyncDatabaseCase.java | 2 +
.../tools/cdc/CdcPostgresSyncDatabaseCase.java | 2 +
.../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 2 +
7 files changed, 64 insertions(+), 14 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 31d32e01..ab26e308 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -281,8 +281,7 @@ public class DorisSystem implements Serializable {
}
private static List<String> identifier(List<String> name) {
- List<String> result = name.stream().map(m ->
identifier(m)).collect(Collectors.toList());
- return result;
+ return
name.stream().map(DorisSystem::identifier).collect(Collectors.toList());
}
public static String identifier(String name) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 7443ef8a..38b942ea 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -132,6 +132,7 @@ public class CdcTools {
String multiToOneTarget = params.get("multi-to-one-target");
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
+ boolean ignoreIncompatible = params.has("ignore-incompatible");
boolean singleSink = params.has("single-sink");
Preconditions.checkArgument(params.has("sink-conf"));
@@ -155,6 +156,7 @@ public class CdcTools {
.setTableConfig(tableMap)
.setCreateTableOnly(createTableOnly)
.setSingleSink(singleSink)
+ .setIgnoreIncompatible(ignoreIncompatible)
.create();
databaseSync.build();
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 691eaafa..a4d0511b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -34,6 +34,7 @@ import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisSystemException;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.WriteMode;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
+import java.sql.SQLSyntaxErrorException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -71,6 +73,7 @@ public abstract class DatabaseSync {
protected Map<String, String> tableConfig = new HashMap<>();
protected Configuration sinkConfig;
protected boolean ignoreDefaultValue;
+ protected boolean ignoreIncompatible;
public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
@@ -128,7 +131,9 @@ public abstract class DatabaseSync {
if (tableConfig.containsKey("table-buckets")) {
tableBucketsMap =
getTableBuckets(tableConfig.get("table-buckets"));
}
- Set<String> bucketsTable = new HashSet<>();
+
+ // Set of table names that have assigned bucket numbers.
+ Set<String> tablesWithBucketsAssigned = new HashSet<>();
Set<String> targetDbSet = new HashSet<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
@@ -147,17 +152,13 @@ public abstract class DatabaseSync {
// Calculate the mapping relationship between upstream and
downstream tables
tableMapping.put(
schema.getTableIdentifier(), String.format("%s.%s",
targetDb, dorisTable));
- if (!dorisSystem.tableExists(targetDb, dorisTable)) {
- TableSchema dorisSchema =
schema.convertTableSchema(tableConfig);
- // set doris target database
- dorisSchema.setDatabase(targetDb);
- dorisSchema.setTable(dorisTable);
- if (tableBucketsMap != null) {
- setTableSchemaBuckets(tableBucketsMap, dorisSchema,
dorisTable, bucketsTable);
- }
- dorisSystem.createTable(dorisSchema);
- }
- if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
+ if (tryCreateTableIfAbsent(
+ dorisSystem,
+ targetDb,
+ dorisTable,
+ schema,
+ tableBucketsMap,
+ tablesWithBucketsAssigned)) {
dorisTables.add(Tuple2.of(targetDb, dorisTable));
}
}
@@ -462,6 +463,41 @@ public abstract class DatabaseSync {
}
}
+ private boolean tryCreateTableIfAbsent(
+ DorisSystem dorisSystem,
+ String targetDb,
+ String dorisTable,
+ SourceSchema schema,
+ Map<String, Integer> tableBucketsMap,
+ Set<String> tableBucketsSet) {
+ if (!dorisSystem.tableExists(targetDb, dorisTable)) {
+ TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
+ dorisSchema.setDatabase(targetDb);
+ dorisSchema.setTable(dorisTable);
+ // set the table buckets of table
+ if (tableBucketsMap != null) {
+ setTableSchemaBuckets(tableBucketsMap, dorisSchema,
dorisTable, tableBucketsSet);
+ }
+ try {
+ dorisSystem.createTable(dorisSchema);
+ return true;
+ } catch (Exception ex) {
+ handleTableCreationFailure(ex);
+ }
+ }
+ return false;
+ }
+
+ private void handleTableCreationFailure(Exception ex) throws
DorisSystemException {
+ if (ignoreIncompatible && ex.getCause() instanceof
SQLSyntaxErrorException) {
+ LOG.warn(
+ "Doris schema and source table schema are not compatible.
Error: {} ",
+ ex.getCause().toString());
+ } else {
+ throw new DorisSystemException("Failed to create table due to: ",
ex);
+ }
+ }
+
public DatabaseSync setEnv(StreamExecutionEnvironment env) {
this.env = env;
return this;
@@ -529,6 +565,11 @@ public abstract class DatabaseSync {
return this;
}
+ public DatabaseSync setIgnoreIncompatible(boolean ignoreIncompatible) {
+ this.ignoreIncompatible = ignoreIncompatible;
+ return this;
+ }
+
public DatabaseSync setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
return this;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 88176e84..2410ddac 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -73,6 +73,7 @@ public class CdcMysqlSyncDatabaseCase {
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
boolean singleSink = false;
+ boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync
.setEnv(env)
@@ -90,6 +91,7 @@ public class CdcMysqlSyncDatabaseCase {
.setCreateTableOnly(false)
.setNewSchemaChange(useNewSchemaChange)
.setSingleSink(singleSink)
+ .setIgnoreIncompatible(ignoreIncompatible)
.create();
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 7bfa7477..fba5866c 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -77,6 +77,7 @@ public class CdcOraclelSyncDatabaseCase {
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
+ boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new OracleDatabaseSync();
databaseSync
.setEnv(env)
@@ -93,6 +94,7 @@ public class CdcOraclelSyncDatabaseCase {
.setTableConfig(tableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(useNewSchemaChange)
+ .setIgnoreIncompatible(ignoreIncompatible)
.create();
databaseSync.build();
env.execute(String.format("Oracle-Doris Database Sync: %s", database));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index b9afc98b..6c933409 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -79,6 +79,7 @@ public class CdcPostgresSyncDatabaseCase {
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
+ boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new PostgresDatabaseSync();
databaseSync
.setEnv(env)
@@ -95,6 +96,7 @@ public class CdcPostgresSyncDatabaseCase {
.setTableConfig(tableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(useNewSchemaChange)
+ .setIgnoreIncompatible(ignoreIncompatible)
.create();
databaseSync.build();
env.execute(String.format("Postgres-Doris Database Sync: %s",
database));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 912bcfc7..9fec63b6 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -77,6 +77,7 @@ public class CdcSqlServerSyncDatabaseCase {
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
+ boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new SqlServerDatabaseSync();
databaseSync
.setEnv(env)
@@ -93,6 +94,7 @@ public class CdcSqlServerSyncDatabaseCase {
.setTableConfig(tableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(useNewSchemaChange)
+ .setIgnoreIncompatible(ignoreIncompatible)
.create();
databaseSync.build();
env.execute(String.format("SqlServer-Doris Database Sync: %s",
database));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]