This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new adbece9c2 [INLONG-7790][Sort] Capture newly added tables when setting 
"scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset" 
(#7794)
adbece9c2 is described below

commit adbece9c227c80372a60d3ab4ea1d765d2f32510
Author: emhui <[email protected]>
AuthorDate: Tue Apr 11 15:36:32 2023 +0800

    [INLONG-7790][Sort] Capture newly added tables when setting 
"scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset" 
(#7794)
---
 .../cdc/mysql/source/reader/MySqlSourceReader.java | 18 +++++++++---
 .../mysql/source/utils/TableDiscoveryUtils.java    | 33 ++++++++++++++++++----
 2 files changed, 41 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 0a7ea82c6..10ae8b4f5 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -235,12 +235,22 @@ public class MySqlSourceReader<T>
 
     private MySqlBinlogSplit 
discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
         final String splitId = split.splitId();
-        if (split.getTableSchemas().isEmpty()) {
+        if (split.getTableSchemas().isEmpty() || 
sourceConfig.isScanNewlyAddedTableEnabled()) {
             try (MySqlConnection jdbc =
                     
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
-                Map<TableId, TableChanges.TableChange> tableSchemas =
-                        
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
-                LOG.info("The table schema discovery for binlog split {} 
success", splitId);
+                Map<TableId, TableChanges.TableChange> tableSchemas;
+                if (split.getTableSchemas().isEmpty()) {
+                    tableSchemas =
+                            
TableDiscoveryUtils.discoverSchemaForCapturedTableSchemas(sourceConfig, jdbc);
+                    LOG.info("The table schema discovery for binlog split {} 
success", splitId);
+                } else {
+                    List<TableId> existedTables = new 
ArrayList<>(split.getTableSchemas().keySet());
+                    tableSchemas =
+                            
TableDiscoveryUtils.discoverSchemaForNewAddedTables(existedTables, 
sourceConfig, jdbc);
+                    LOG.info(
+                            "The table schema discovery for new added tables 
of binlog split {} success",
+                            split.splitId());
+                }
                 return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
             } catch (SQLException e) {
                 LOG.error("Failed to obtains table schemas due to {}", 
e.getMessage());
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
index abf925e89..005f36332 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
@@ -22,6 +22,8 @@ import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.RelationalTableFilters;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.stream.Collectors;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.sort.cdc.mysql.schema.MySqlSchema;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
@@ -102,7 +104,7 @@ public class TableDiscoveryUtils {
     /**
      * Discover schemas of table.
      */
-    public static Map<TableId, TableChanges.TableChange> 
discoverCapturedTableSchemas(
+    public static Map<TableId, TableChanges.TableChange> 
discoverSchemaForCapturedTableSchemas(
             MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
         final List<TableId> capturedTableIds;
         try {
@@ -110,19 +112,38 @@ public class TableDiscoveryUtils {
         } catch (SQLException e) {
             throw new FlinkRuntimeException("Failed to discover captured 
tables", e);
         }
+        return discoverSchemaForCapturedTables(capturedTableIds, sourceConfig, 
jdbc);
+    }
+
+    public static Map<TableId, TableChange> discoverSchemaForNewAddedTables(
+            List<TableId> existedTables, MySqlSourceConfig sourceConfig, 
MySqlConnection jdbc) {
+        final List<TableId> capturedTableIds;
+        try {
+            capturedTableIds =
+                    listTables(jdbc, sourceConfig.getTableFilters()).stream()
+                            .filter(tableId -> 
!existedTables.contains(tableId))
+                            .collect(Collectors.toList());
+        } catch (SQLException e) {
+            throw new FlinkRuntimeException("Failed to discover captured 
tables", e);
+        }
+        return capturedTableIds.isEmpty()
+                ? new HashMap<>()
+                : discoverSchemaForCapturedTables(capturedTableIds, 
sourceConfig, jdbc);
+    }
+
+    public static Map<TableId, TableChange> discoverSchemaForCapturedTables(
+            List<TableId> capturedTableIds, MySqlSourceConfig sourceConfig, 
MySqlConnection jdbc) {
         if (capturedTableIds.isEmpty()) {
             throw new IllegalArgumentException(
                     String.format(
-                            "Can't find any matched tables, please check your "
-                                    + "configured database-name: %s and 
table-name: %s",
+                            "Can't find any matched tables, please check your 
configured database-name: %s and table-name: %s",
                             sourceConfig.getDatabaseList(), 
sourceConfig.getTableList()));
         }
-
         // fetch table schemas
         MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, 
jdbc.isTableIdCaseSensitive());
-        Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
+        Map<TableId, TableChange> tableSchemas = new HashMap<>();
         for (TableId tableId : capturedTableIds) {
-            TableChanges.TableChange tableSchema = 
mySqlSchema.getTableSchema(jdbc, tableId);
+            TableChange tableSchema = mySqlSchema.getTableSchema(jdbc, 
tableId);
             tableSchemas.put(tableId, tableSchema);
         }
         return tableSchemas;

Reply via email to