This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new adbece9c2 [INLONG-7790][Sort] Capture newly added tables when setting
"scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset"
(#7794)
adbece9c2 is described below
commit adbece9c227c80372a60d3ab4ea1d765d2f32510
Author: emhui <[email protected]>
AuthorDate: Tue Apr 11 15:36:32 2023 +0800
[INLONG-7790][Sort] Capture newly added tables when setting
"scan.newly-added-table.enabled=true" and "scan.startup.mode=latest-offset"
(#7794)
---
.../cdc/mysql/source/reader/MySqlSourceReader.java | 18 +++++++++---
.../mysql/source/utils/TableDiscoveryUtils.java | 33 ++++++++++++++++++----
2 files changed, 41 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 0a7ea82c6..10ae8b4f5 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -235,12 +235,22 @@ public class MySqlSourceReader<T>
private MySqlBinlogSplit
discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
final String splitId = split.splitId();
- if (split.getTableSchemas().isEmpty()) {
+ if (split.getTableSchemas().isEmpty() ||
sourceConfig.isScanNewlyAddedTableEnabled()) {
try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
- Map<TableId, TableChanges.TableChange> tableSchemas =
-
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
- LOG.info("The table schema discovery for binlog split {}
success", splitId);
+ Map<TableId, TableChanges.TableChange> tableSchemas;
+ if (split.getTableSchemas().isEmpty()) {
+ tableSchemas =
+
TableDiscoveryUtils.discoverSchemaForCapturedTableSchemas(sourceConfig, jdbc);
+ LOG.info("The table schema discovery for binlog split {}
success", splitId);
+ } else {
+ List<TableId> existedTables = new
ArrayList<>(split.getTableSchemas().keySet());
+ tableSchemas =
+
TableDiscoveryUtils.discoverSchemaForNewAddedTables(existedTables,
sourceConfig, jdbc);
+ LOG.info(
+ "The table schema discovery for new added tables
of binlog split {} success",
+ split.splitId());
+ }
return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
} catch (SQLException e) {
LOG.error("Failed to obtains table schemas due to {}",
e.getMessage());
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
index abf925e89..005f36332 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
@@ -22,6 +22,8 @@ import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.stream.Collectors;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.sort.cdc.mysql.schema.MySqlSchema;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
@@ -102,7 +104,7 @@ public class TableDiscoveryUtils {
/**
* Discover schemas of table.
*/
- public static Map<TableId, TableChanges.TableChange>
discoverCapturedTableSchemas(
+ public static Map<TableId, TableChanges.TableChange>
discoverSchemaForCapturedTableSchemas(
MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
final List<TableId> capturedTableIds;
try {
@@ -110,19 +112,38 @@ public class TableDiscoveryUtils {
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to discover captured
tables", e);
}
+ return discoverSchemaForCapturedTables(capturedTableIds, sourceConfig,
jdbc);
+ }
+
+ public static Map<TableId, TableChange> discoverSchemaForNewAddedTables(
+ List<TableId> existedTables, MySqlSourceConfig sourceConfig,
MySqlConnection jdbc) {
+ final List<TableId> capturedTableIds;
+ try {
+ capturedTableIds =
+ listTables(jdbc, sourceConfig.getTableFilters()).stream()
+ .filter(tableId ->
!existedTables.contains(tableId))
+ .collect(Collectors.toList());
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException("Failed to discover captured
tables", e);
+ }
+ return capturedTableIds.isEmpty()
+ ? new HashMap<>()
+ : discoverSchemaForCapturedTables(capturedTableIds,
sourceConfig, jdbc);
+ }
+
+ public static Map<TableId, TableChange> discoverSchemaForCapturedTables(
+ List<TableId> capturedTableIds, MySqlSourceConfig sourceConfig,
MySqlConnection jdbc) {
if (capturedTableIds.isEmpty()) {
throw new IllegalArgumentException(
String.format(
- "Can't find any matched tables, please check your "
- + "configured database-name: %s and
table-name: %s",
+ "Can't find any matched tables, please check your
configured database-name: %s and table-name: %s",
sourceConfig.getDatabaseList(),
sourceConfig.getTableList()));
}
-
// fetch table schemas
MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig,
jdbc.isTableIdCaseSensitive());
- Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
+ Map<TableId, TableChange> tableSchemas = new HashMap<>();
for (TableId tableId : capturedTableIds) {
- TableChanges.TableChange tableSchema =
mySqlSchema.getTableSchema(jdbc, tableId);
+ TableChange tableSchema = mySqlSchema.getTableSchema(jdbc,
tableId);
tableSchemas.put(tableId, tableSchema);
}
return tableSchemas;