This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 43e0e5c  [Improve] support multi database sync (#322)
43e0e5c is described below

commit 43e0e5cf9b832854ea228fb093077872e3a311b6
Author: wudi <[email protected]>
AuthorDate: Tue Mar 5 10:17:57 2024 +0800

    [Improve] support multi database sync (#322)
---
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 46 +++++++++++++---------
 .../apache/doris/flink/tools/cdc/SourceSchema.java |  4 ++
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   | 38 +++++++++++-------
 .../doris/flink/tools/cdc/mysql/MysqlSchema.java   |  5 +++
 .../doris/flink/tools/cdc/oracle/OracleSchema.java |  5 +++
 .../flink/tools/cdc/postgres/PostgresSchema.java   |  5 +++
 .../flink/tools/cdc/sqlserver/SqlServerSchema.java |  5 +++
 .../doris/flink/tools/cdc/MockSourceSchema.java}   | 21 +++++-----
 8 files changed, 87 insertions(+), 42 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index d713355..3291b4c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.tools.cdc;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -115,14 +116,14 @@ public abstract class DatabaseSync {
 
         List<SourceSchema> schemaList = getSchemaList();
         Preconditions.checkState(!schemaList.isEmpty(), "No tables to be 
synchronized.");
-        if (!dorisSystem.databaseExists(database)) {
+
+        if (!StringUtils.isNullOrWhitespaceOnly(database)
+                && !dorisSystem.databaseExists(database)) {
             LOG.info("database {} not exist, created", database);
             dorisSystem.createDatabase(database);
         }
-
         List<String> syncTables = new ArrayList<>();
-        List<String> dorisTables = new ArrayList<>();
-
+        List<Tuple2<String, String>> dorisTables = new ArrayList<>();
         Map<String, Integer> tableBucketsMap = null;
         if (tableConfig.containsKey("table-buckets")) {
             tableBucketsMap = 
getTableBuckets(tableConfig.get("table-buckets"));
@@ -130,23 +131,32 @@ public abstract class DatabaseSync {
         Set<String> bucketsTable = new HashSet<>();
         for (SourceSchema schema : schemaList) {
             syncTables.add(schema.getTableName());
+            String targetDb = database;
+            // Synchronize multiple databases using the src database name
+            if (StringUtils.isNullOrWhitespaceOnly(targetDb)) {
+                targetDb = schema.getDatabaseName();
+            }
+            if (StringUtils.isNullOrWhitespaceOnly(database)
+                    && !dorisSystem.databaseExists(targetDb)) {
+                LOG.info("database {} not exist, created", targetDb);
+                dorisSystem.createDatabase(targetDb);
+            }
             String dorisTable = converter.convert(schema.getTableName());
-
             // Calculate the mapping relationship between upstream and 
downstream tables
             tableMapping.put(
-                    schema.getTableIdentifier(), String.format("%s.%s", 
database, dorisTable));
-            if (!dorisSystem.tableExists(database, dorisTable)) {
+                    schema.getTableIdentifier(), String.format("%s.%s", 
targetDb, dorisTable));
+            if (!dorisSystem.tableExists(targetDb, dorisTable)) {
                 TableSchema dorisSchema = 
schema.convertTableSchema(tableConfig);
                 // set doris target database
-                dorisSchema.setDatabase(database);
+                dorisSchema.setDatabase(targetDb);
                 dorisSchema.setTable(dorisTable);
                 if (tableBucketsMap != null) {
                     setTableSchemaBuckets(tableBucketsMap, dorisSchema, 
dorisTable, bucketsTable);
                 }
                 dorisSystem.createTable(dorisSchema);
             }
-            if (!dorisTables.contains(dorisTable)) {
-                dorisTables.add(dorisTable);
+            if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
+                dorisTables.add(Tuple2.of(targetDb, dorisTable));
             }
         }
         if (createTableOnly) {
@@ -160,18 +170,18 @@ public abstract class DatabaseSync {
         } else {
             SingleOutputStreamOperator<Void> parsedStream =
                     streamSource.process(buildProcessFunction());
-            for (String table : dorisTables) {
+            for (Tuple2<String, String> dbTbl : dorisTables) {
                 OutputTag<String> recordOutputTag =
-                        ParsingProcessFunction.createRecordOutputTag(table);
+                        ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
                 DataStream<String> sideOutput = 
parsedStream.getSideOutput(recordOutputTag);
                 int sinkParallel =
                         sinkConfig.getInteger(
                                 DorisConfigOptions.SINK_PARALLELISM, 
sideOutput.getParallelism());
                 sideOutput
-                        .sinkTo(buildDorisSink(table))
+                        .sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1))
                         .setParallelism(sinkParallel)
-                        .name(table)
-                        .uid(table);
+                        .name(dbTbl.f1)
+                        .uid(dbTbl.f1);
             }
         }
     }
@@ -205,7 +215,7 @@ public abstract class DatabaseSync {
     }
 
     /** create doris sink. */
-    public DorisSink<String> buildDorisSink(String table) {
+    public DorisSink<String> buildDorisSink(String tableIdentifier) {
         String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
         String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
         String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
@@ -225,8 +235,8 @@ public abstract class DatabaseSync {
                 .ifPresent(dorisBuilder::setAutoRedirect);
 
         // single sink not need table identifier
-        if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)) {
-            dorisBuilder.setTableIdentifier(database + "." + table);
+        if (!singleSink && 
!StringUtils.isNullOrWhitespaceOnly(tableIdentifier)) {
+            dorisBuilder.setTableIdentifier(tableIdentifier);
         }
 
         Properties pro = new Properties();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index e09eb00..cbef59f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -48,6 +48,10 @@ public abstract class SourceSchema {
         this.tableComment = tableComment;
     }
 
+    public abstract String convertToDorisType(String fieldType, Integer 
precision, Integer scale);
+
+    public abstract String getCdcTableName();
+
     public String getTableIdentifier() {
         return getString(databaseName, schemaName, tableName);
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index f00acfc..8da57f6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -100,24 +100,34 @@ public class MysqlDatabaseSync extends DatabaseSync {
     @Override
     public List<SourceSchema> getSchemaList() throws Exception {
         String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
+
         List<SourceSchema> schemaList = new ArrayList<>();
         try (Connection conn = getConnection()) {
             DatabaseMetaData metaData = conn.getMetaData();
-            try (ResultSet tables =
-                    metaData.getTables(databaseName, null, "%", new String[] 
{"TABLE"})) {
-                while (tables.next()) {
-                    String tableName = tables.getString("TABLE_NAME");
-                    String tableComment = tables.getString("REMARKS");
-                    if (!isSyncNeeded(tableName)) {
-                        continue;
+            try (ResultSet catalogs = metaData.getCatalogs()) {
+                while (catalogs.next()) {
+                    String tableCatalog = catalogs.getString("TABLE_CAT");
+                    if (tableCatalog.matches(databaseName)) {
+                        try (ResultSet tables =
+                                metaData.getTables(
+                                        tableCatalog, null, "%", new String[] 
{"TABLE"})) {
+                            while (tables.next()) {
+                                String tableName = 
tables.getString("TABLE_NAME");
+                                String tableComment = 
tables.getString("REMARKS");
+                                if (!isSyncNeeded(tableName)) {
+                                    continue;
+                                }
+                                SourceSchema sourceSchema =
+                                        new MysqlSchema(
+                                                metaData, tableCatalog, 
tableName, tableComment);
+                                sourceSchema.setModel(
+                                        !sourceSchema.primaryKeys.isEmpty()
+                                                ? DataModel.UNIQUE
+                                                : DataModel.DUPLICATE);
+                                schemaList.add(sourceSchema);
+                            }
+                        }
                     }
-                    SourceSchema sourceSchema =
-                            new MysqlSchema(metaData, databaseName, tableName, 
tableComment);
-                    sourceSchema.setModel(
-                            !sourceSchema.primaryKeys.isEmpty()
-                                    ? DataModel.UNIQUE
-                                    : DataModel.DUPLICATE);
-                    schemaList.add(sourceSchema);
                 }
             }
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
index f84ca94..3a9ffbd 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
@@ -32,4 +32,9 @@ public class MysqlSchema extends JdbcSourceSchema {
     public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
         return MysqlType.toDorisType(fieldType, precision, scale);
     }
+
+    @Override
+    public String getCdcTableName() {
+        return databaseName + "\\." + tableName;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
index f843b6d..e059181 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -37,4 +37,9 @@ public class OracleSchema extends JdbcSourceSchema {
     public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
         return OracleType.toDorisType(fieldType, precision, scale);
     }
+
+    @Override
+    public String getCdcTableName() {
+        return schemaName + "\\." + tableName;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
index 3208116..a431c41 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
@@ -37,4 +37,9 @@ public class PostgresSchema extends JdbcSourceSchema {
     public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
         return PostgresType.toDorisType(fieldType, precision, scale);
     }
+
+    @Override
+    public String getCdcTableName() {
+        return schemaName + "\\." + tableName;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
index 6d5ab9a..18131ce 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
@@ -37,4 +37,9 @@ public class SqlServerSchema extends JdbcSourceSchema {
     public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
         return SqlServerType.toDorisType(fieldType, precision, scale);
     }
+
+    @Override
+    public String getCdcTableName() {
+        return schemaName + "\\." + tableName;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java
similarity index 67%
copy from 
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
copy to 
flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java
index f84ca94..3be56e1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java
@@ -15,21 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.tools.cdc.mysql;
+package org.apache.doris.flink.tools.cdc;
 
-import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
+public class MockSourceSchema extends SourceSchema {
 
-import java.sql.DatabaseMetaData;
-
-public class MysqlSchema extends JdbcSourceSchema {
-
-    public MysqlSchema(
-            DatabaseMetaData metaData, String databaseName, String tableName, 
String tableComment)
+    public MockSourceSchema(String databaseName, String schemaName, String 
tableName)
             throws Exception {
-        super(metaData, databaseName, null, tableName, tableComment);
+        super(databaseName, schemaName, tableName, "");
     }
 
+    @Override
     public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
-        return MysqlType.toDorisType(fieldType, precision, scale);
+        return null;
+    }
+
+    @Override
+    public String getCdcTableName() {
+        return databaseName + "\\." + tableName;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to