This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a8c6cc6e0c [Improve] Improve read table schema in cdc connector (#6702)
a8c6cc6e0c is described below
commit a8c6cc6e0cde5c813b697b695c7070700ff8591a
Author: Jia Fan <[email protected]>
AuthorDate: Mon Apr 15 20:18:10 2024 +0800
[Improve] Improve read table schema in cdc connector (#6702)
---
.../seatunnel/cdc/oracle/utils/OracleSchema.java | 23 ++++++++-------
.../cdc/postgres/utils/PostgresSchema.java | 33 +++++++++++-----------
.../sqlserver/source/utils/SqlServerSchema.java | 24 ++++++++--------
3 files changed, 43 insertions(+), 37 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
index 6524192845..f2713e3481 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
@@ -57,14 +57,12 @@ public class OracleSchema {
TableChange schema = schemasByTableId.get(tableId);
if (schema == null) {
schema = readTableSchema(jdbc, tableId);
- schemasByTableId.put(tableId, schema);
}
return schema;
}
private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
OracleConnection oracleConnection = (OracleConnection) jdbc;
- final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables();
try {
@@ -75,22 +73,27 @@ public class OracleSchema {
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);
-
- Table table =
- CatalogTableUtils.mergeCatalogTableConfig(
- tables.forTable(tableId), tableMap.get(tableId));
- TableChange tableChange = new
TableChange(TableChanges.TableChangeType.CREATE, table);
- tableChangeMap.put(tableId, tableChange);
+ for (TableId id : tables.tableIds()) {
+ if (tableMap.containsKey(id)) {
+ Table table =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ tables.forTable(id), tableMap.get(id));
+ TableChanges.TableChange tableChange =
+ new TableChanges.TableChange(
+ TableChanges.TableChangeType.CREATE,
table);
+ schemasByTableId.put(id, tableChange);
+ }
+ }
} catch (SQLException e) {
throw new SeaTunnelException(
String.format("Failed to read schema for table %s ",
tableId), e);
}
- if (!tableChangeMap.containsKey(tableId)) {
+ if (!schemasByTableId.containsKey(tableId)) {
throw new SeaTunnelException(
String.format("Can't obtain schema for table %s ",
tableId));
}
- return tableChangeMap.get(tableId);
+ return schemasByTableId.get(tableId);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java
index 7a9048b405..8470f7d95a 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java
@@ -30,7 +30,6 @@ import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,44 +51,46 @@ public class PostgresSchema {
TableChanges.TableChange schema = schemasByTableId.get(tableId);
if (schema == null) {
schema = readTableSchema(jdbc, tableId);
- schemasByTableId.put(tableId, schema);
}
return schema;
}
private TableChanges.TableChange readTableSchema(JdbcConnection jdbc,
TableId tableId) {
-
- CatalogTable catalogTable = tableMap.get(tableId);
// Because the catalog is null in the postgresConnection.readSchema
method
- tableId = new TableId(null, tableId.schema(), tableId.table());
+ TableId tableIdWithoutCatalog = new TableId(null, tableId.schema(),
tableId.table());
PostgresConnection postgresConnection = (PostgresConnection) jdbc;
- final Map<TableId, TableChanges.TableChange> tableChangeMap = new
HashMap<>();
Tables tables = new Tables();
try {
postgresConnection.readSchema(
tables,
- tableId.catalog(),
- tableId.schema(),
+ tableIdWithoutCatalog.catalog(),
+ tableIdWithoutCatalog.schema(),
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);
- Table table =
- CatalogTableUtils.mergeCatalogTableConfig(
- tables.forTable(tableId), catalogTable);
- TableChanges.TableChange tableChange =
- new
TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
- tableChangeMap.put(tableId, tableChange);
+ for (TableId id : tables.tableIds()) {
+ TableId idWithCatalog = new TableId(tableId.catalog(),
id.schema(), id.table());
+ if (tableMap.containsKey(idWithCatalog)) {
+ Table table =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ tables.forTable(id),
tableMap.get(idWithCatalog));
+ TableChanges.TableChange tableChange =
+ new TableChanges.TableChange(
+ TableChanges.TableChangeType.CREATE,
table);
+ schemasByTableId.put(idWithCatalog, tableChange);
+ }
+ }
} catch (SQLException e) {
throw new SeaTunnelException(
String.format("Failed to read schema for table %s ",
tableId), e);
}
- if (!tableChangeMap.containsKey(tableId)) {
+ if (!schemasByTableId.containsKey(tableId)) {
throw new SeaTunnelException(
String.format("Can't obtain schema for table %s ",
tableId));
}
- return tableChangeMap.get(tableId);
+ return schemasByTableId.get(tableId);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
index 79f58e3e2d..77b2594f05 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
@@ -31,7 +31,6 @@ import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
import java.sql.SQLException;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -54,15 +53,12 @@ public class SqlServerSchema {
TableChange schema = schemasByTableId.get(tableId);
if (schema == null) {
schema = readTableSchema(jdbc, tableId);
- schemasByTableId.put(tableId, schema);
}
return schema;
}
private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;
-
- final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables();
try {
sqlServerConnection.readSchema(
@@ -72,21 +68,27 @@ public class SqlServerSchema {
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);
- Table table =
- CatalogTableUtils.mergeCatalogTableConfig(
- tables.forTable(tableId), tableMap.get(tableId));
- TableChange tableChange = new
TableChange(TableChanges.TableChangeType.CREATE, table);
- tableChangeMap.put(tableId, tableChange);
+ for (TableId id : tables.tableIds()) {
+ if (tableMap.containsKey(id)) {
+ Table table =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ tables.forTable(id), tableMap.get(id));
+ TableChanges.TableChange tableChange =
+ new TableChanges.TableChange(
+ TableChanges.TableChangeType.CREATE,
table);
+ schemasByTableId.put(id, tableChange);
+ }
+ }
} catch (SQLException e) {
throw new SeaTunnelException(
String.format("Failed to read schema for table %s ",
tableId), e);
}
- if (!tableChangeMap.containsKey(tableId)) {
+ if (!schemasByTableId.containsKey(tableId)) {
throw new SeaTunnelException(
String.format("Can't obtain schema for table %s ",
tableId));
}
- return tableChangeMap.get(tableId);
+ return schemasByTableId.get(tableId);
}
}