This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 43e0e5c [Improve] support multi database sync (#322)
43e0e5c is described below
commit 43e0e5cf9b832854ea228fb093077872e3a311b6
Author: wudi <[email protected]>
AuthorDate: Tue Mar 5 10:17:57 2024 +0800
[Improve] support multi database sync (#322)
---
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 46 +++++++++++++---------
.../apache/doris/flink/tools/cdc/SourceSchema.java | 4 ++
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 38 +++++++++++-------
.../doris/flink/tools/cdc/mysql/MysqlSchema.java | 5 +++
.../doris/flink/tools/cdc/oracle/OracleSchema.java | 5 +++
.../flink/tools/cdc/postgres/PostgresSchema.java | 5 +++
.../flink/tools/cdc/sqlserver/SqlServerSchema.java | 5 +++
.../doris/flink/tools/cdc/MockSourceSchema.java} | 21 +++++-----
8 files changed, 87 insertions(+), 42 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index d713355..3291b4c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.tools.cdc;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -115,14 +116,14 @@ public abstract class DatabaseSync {
List<SourceSchema> schemaList = getSchemaList();
Preconditions.checkState(!schemaList.isEmpty(), "No tables to be
synchronized.");
- if (!dorisSystem.databaseExists(database)) {
+
+ if (!StringUtils.isNullOrWhitespaceOnly(database)
+ && !dorisSystem.databaseExists(database)) {
LOG.info("database {} not exist, created", database);
dorisSystem.createDatabase(database);
}
-
List<String> syncTables = new ArrayList<>();
- List<String> dorisTables = new ArrayList<>();
-
+ List<Tuple2<String, String>> dorisTables = new ArrayList<>();
Map<String, Integer> tableBucketsMap = null;
if (tableConfig.containsKey("table-buckets")) {
tableBucketsMap =
getTableBuckets(tableConfig.get("table-buckets"));
@@ -130,23 +131,32 @@ public abstract class DatabaseSync {
Set<String> bucketsTable = new HashSet<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
+ String targetDb = database;
+ // Synchronize multiple databases using the src database name
+ if (StringUtils.isNullOrWhitespaceOnly(targetDb)) {
+ targetDb = schema.getDatabaseName();
+ }
+ if (StringUtils.isNullOrWhitespaceOnly(database)
+ && !dorisSystem.databaseExists(targetDb)) {
+ LOG.info("database {} not exist, created", targetDb);
+ dorisSystem.createDatabase(targetDb);
+ }
String dorisTable = converter.convert(schema.getTableName());
-
// Calculate the mapping relationship between upstream and
downstream tables
tableMapping.put(
- schema.getTableIdentifier(), String.format("%s.%s",
database, dorisTable));
- if (!dorisSystem.tableExists(database, dorisTable)) {
+ schema.getTableIdentifier(), String.format("%s.%s",
targetDb, dorisTable));
+ if (!dorisSystem.tableExists(targetDb, dorisTable)) {
TableSchema dorisSchema =
schema.convertTableSchema(tableConfig);
// set doris target database
- dorisSchema.setDatabase(database);
+ dorisSchema.setDatabase(targetDb);
dorisSchema.setTable(dorisTable);
if (tableBucketsMap != null) {
setTableSchemaBuckets(tableBucketsMap, dorisSchema,
dorisTable, bucketsTable);
}
dorisSystem.createTable(dorisSchema);
}
- if (!dorisTables.contains(dorisTable)) {
- dorisTables.add(dorisTable);
+ if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
+ dorisTables.add(Tuple2.of(targetDb, dorisTable));
}
}
if (createTableOnly) {
@@ -160,18 +170,18 @@ public abstract class DatabaseSync {
} else {
SingleOutputStreamOperator<Void> parsedStream =
streamSource.process(buildProcessFunction());
- for (String table : dorisTables) {
+ for (Tuple2<String, String> dbTbl : dorisTables) {
OutputTag<String> recordOutputTag =
- ParsingProcessFunction.createRecordOutputTag(table);
+ ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
DataStream<String> sideOutput =
parsedStream.getSideOutput(recordOutputTag);
int sinkParallel =
sinkConfig.getInteger(
DorisConfigOptions.SINK_PARALLELISM,
sideOutput.getParallelism());
sideOutput
- .sinkTo(buildDorisSink(table))
+ .sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1))
.setParallelism(sinkParallel)
- .name(table)
- .uid(table);
+ .name(dbTbl.f1)
+ .uid(dbTbl.f1);
}
}
}
@@ -205,7 +215,7 @@ public abstract class DatabaseSync {
}
/** create doris sink. */
- public DorisSink<String> buildDorisSink(String table) {
+ public DorisSink<String> buildDorisSink(String tableIdentifier) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
@@ -225,8 +235,8 @@ public abstract class DatabaseSync {
.ifPresent(dorisBuilder::setAutoRedirect);
// single sink not need table identifier
- if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)) {
- dorisBuilder.setTableIdentifier(database + "." + table);
+ if (!singleSink &&
!StringUtils.isNullOrWhitespaceOnly(tableIdentifier)) {
+ dorisBuilder.setTableIdentifier(tableIdentifier);
}
Properties pro = new Properties();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index e09eb00..cbef59f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -48,6 +48,10 @@ public abstract class SourceSchema {
this.tableComment = tableComment;
}
+ public abstract String convertToDorisType(String fieldType, Integer
precision, Integer scale);
+
+ public abstract String getCdcTableName();
+
public String getTableIdentifier() {
return getString(databaseName, schemaName, tableName);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index f00acfc..8da57f6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -100,24 +100,34 @@ public class MysqlDatabaseSync extends DatabaseSync {
@Override
public List<SourceSchema> getSchemaList() throws Exception {
String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
+
List<SourceSchema> schemaList = new ArrayList<>();
try (Connection conn = getConnection()) {
DatabaseMetaData metaData = conn.getMetaData();
- try (ResultSet tables =
- metaData.getTables(databaseName, null, "%", new String[]
{"TABLE"})) {
- while (tables.next()) {
- String tableName = tables.getString("TABLE_NAME");
- String tableComment = tables.getString("REMARKS");
- if (!isSyncNeeded(tableName)) {
- continue;
+ try (ResultSet catalogs = metaData.getCatalogs()) {
+ while (catalogs.next()) {
+ String tableCatalog = catalogs.getString("TABLE_CAT");
+ if (tableCatalog.matches(databaseName)) {
+ try (ResultSet tables =
+ metaData.getTables(
+ tableCatalog, null, "%", new String[]
{"TABLE"})) {
+ while (tables.next()) {
+ String tableName =
tables.getString("TABLE_NAME");
+ String tableComment =
tables.getString("REMARKS");
+ if (!isSyncNeeded(tableName)) {
+ continue;
+ }
+ SourceSchema sourceSchema =
+ new MysqlSchema(
+ metaData, tableCatalog,
tableName, tableComment);
+ sourceSchema.setModel(
+ !sourceSchema.primaryKeys.isEmpty()
+ ? DataModel.UNIQUE
+ : DataModel.DUPLICATE);
+ schemaList.add(sourceSchema);
+ }
+ }
}
- SourceSchema sourceSchema =
- new MysqlSchema(metaData, databaseName, tableName,
tableComment);
- sourceSchema.setModel(
- !sourceSchema.primaryKeys.isEmpty()
- ? DataModel.UNIQUE
- : DataModel.DUPLICATE);
- schemaList.add(sourceSchema);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
index f84ca94..3a9ffbd 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
@@ -32,4 +32,9 @@ public class MysqlSchema extends JdbcSourceSchema {
public String convertToDorisType(String fieldType, Integer precision,
Integer scale) {
return MysqlType.toDorisType(fieldType, precision, scale);
}
+
+ @Override
+ public String getCdcTableName() {
+ return databaseName + "\\." + tableName;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
index f843b6d..e059181 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -37,4 +37,9 @@ public class OracleSchema extends JdbcSourceSchema {
public String convertToDorisType(String fieldType, Integer precision,
Integer scale) {
return OracleType.toDorisType(fieldType, precision, scale);
}
+
+ @Override
+ public String getCdcTableName() {
+ return schemaName + "\\." + tableName;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
index 3208116..a431c41 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
@@ -37,4 +37,9 @@ public class PostgresSchema extends JdbcSourceSchema {
public String convertToDorisType(String fieldType, Integer precision,
Integer scale) {
return PostgresType.toDorisType(fieldType, precision, scale);
}
+
+ @Override
+ public String getCdcTableName() {
+ return schemaName + "\\." + tableName;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
index 6d5ab9a..18131ce 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
@@ -37,4 +37,9 @@ public class SqlServerSchema extends JdbcSourceSchema {
public String convertToDorisType(String fieldType, Integer precision,
Integer scale) {
return SqlServerType.toDorisType(fieldType, precision, scale);
}
+
+ @Override
+ public String getCdcTableName() {
+ return schemaName + "\\." + tableName;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java
similarity index 67%
copy from
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
copy to
flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java
index f84ca94..3be56e1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java
@@ -15,21 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.tools.cdc.mysql;
+package org.apache.doris.flink.tools.cdc;
-import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
+public class MockSourceSchema extends SourceSchema {
-import java.sql.DatabaseMetaData;
-
-public class MysqlSchema extends JdbcSourceSchema {
-
- public MysqlSchema(
- DatabaseMetaData metaData, String databaseName, String tableName,
String tableComment)
+ public MockSourceSchema(String databaseName, String schemaName, String
tableName)
throws Exception {
- super(metaData, databaseName, null, tableName, tableComment);
+ super(databaseName, schemaName, tableName, "");
}
+ @Override
public String convertToDorisType(String fieldType, Integer precision,
Integer scale) {
- return MysqlType.toDorisType(fieldType, precision, scale);
+ return null;
+ }
+
+ @Override
+ public String getCdcTableName() {
+ return databaseName + "\\." + tableName;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]