This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2f62235e38 Fix Jdbc sink target table name error (#6269)
2f62235e38 is described below
commit 2f62235e389d9196f45677a638e62379ca8f8fa9
Author: Eric <[email protected]>
AuthorDate: Wed Jan 24 19:53:02 2024 +0800
Fix Jdbc sink target table name error (#6269)
---
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 12 +-
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 161 +++++++++++----------
.../connector/cdc/sqlserver/SqlServerCDCIT.java | 2 +-
3 files changed, 90 insertions(+), 85 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index af95537bd2..599c0f5820 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -172,6 +172,10 @@ public class JdbcSink
if (StringUtils.isBlank(jdbcSinkConfig.getTable())) {
return Optional.empty();
}
+ // use query to write data can not support savemode
+ if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
+ return Optional.empty();
+ }
Optional<Catalog> catalogOptional =
JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect);
if (catalogOptional.isPresent()) {
@@ -185,10 +189,10 @@ public class JdbcSink
: fieldIdeEnumEnum.getValue();
TablePath tablePath =
TablePath.of(
- jdbcSinkConfig.getDatabase()
- + "."
- +
CatalogUtils.quoteTableIdentifier(
- jdbcSinkConfig.getTable(),
fieldIde));
+
catalogTable.getTableId().getDatabaseName(),
+ catalogTable.getTableId().getSchemaName(),
+ CatalogUtils.quoteTableIdentifier(
+
catalogTable.getTableId().getTableName(), fieldIde));
catalogTable.getOptions().put("fieldIde", fieldIde);
return Optional.of(
new DefaultSaveModeHandler(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 4fe88e669f..eff6bb67c6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -99,91 +99,89 @@ public class JdbcSinkFactory implements TableSinkFactory {
ReadonlyConfig catalogOptions = getCatalogOptions(context);
Optional<String> optionalTable = config.getOptional(TABLE);
Optional<String> optionalDatabase = config.getOptional(DATABASE);
- Optional<String> queryOptional = config.getOptional(QUERY);
- if (!optionalTable.isPresent() && !queryOptional.isPresent()) {
+ if (!optionalTable.isPresent()) {
optionalTable = Optional.of(REPLACE_TABLE_NAME_KEY);
- // get source table relevant information
- TableIdentifier tableId = catalogTable.getTableId();
- String sourceDatabaseName = tableId.getDatabaseName();
- String sourceSchemaName = tableId.getSchemaName();
- String sourceTableName = tableId.getTableName();
- // get sink table relevant information
- String sinkDatabaseName =
optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY);
- String sinkTableNameBefore = optionalTable.get();
- String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
- String sinkTableName =
sinkTableSplitArray[sinkTableSplitArray.length - 1];
- String sinkSchemaName;
- if (sinkTableSplitArray.length > 1) {
- sinkSchemaName =
sinkTableSplitArray[sinkTableSplitArray.length - 2];
- } else {
- sinkSchemaName = null;
- }
- if
(StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) {
- sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
- }
- // to add tablePrefix and tableSuffix
- String tempTableName;
- String prefix =
catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
- String suffix =
catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
- if (StringUtils.isNotEmpty(prefix) ||
StringUtils.isNotEmpty(suffix)) {
- tempTableName =
- StringUtils.isNotEmpty(prefix) ? prefix +
sinkTableName : sinkTableName;
- tempTableName =
- StringUtils.isNotEmpty(suffix) ? tempTableName +
suffix : tempTableName;
+ }
+ // get source table relevant information
+ TableIdentifier tableId = catalogTable.getTableId();
+ String sourceDatabaseName = tableId.getDatabaseName();
+ String sourceSchemaName = tableId.getSchemaName();
+ String sourceTableName = tableId.getTableName();
+ // get sink table relevant information
+ String sinkDatabaseName =
optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY);
+ String sinkTableNameBefore = optionalTable.get();
+ String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
+ String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length
- 1];
+ String sinkSchemaName;
+ if (sinkTableSplitArray.length > 1) {
+ sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length -
2];
+ } else {
+ sinkSchemaName = null;
+ }
+ if
(StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) {
+ sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
+ }
+ // to add tablePrefix and tableSuffix
+ String tempTableName;
+ String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
+ String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
+ if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) {
+ tempTableName = StringUtils.isNotEmpty(prefix) ? prefix +
sinkTableName : sinkTableName;
+ tempTableName = StringUtils.isNotEmpty(suffix) ? tempTableName +
suffix : tempTableName;
- } else {
- tempTableName = sinkTableName;
- }
- // to replace
- String finalDatabaseName = sinkDatabaseName;
- if (StringUtils.isNotEmpty(sourceDatabaseName)) {
- finalDatabaseName =
- sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY,
sourceDatabaseName);
- }
+ } else {
+ tempTableName = sinkTableName;
+ }
+ // to replace
+ String finalDatabaseName = sinkDatabaseName;
+ if (StringUtils.isNotEmpty(sourceDatabaseName)) {
+ finalDatabaseName =
+ sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY,
sourceDatabaseName);
+ }
- String finalSchemaName;
- if (sinkSchemaName != null) {
- if (sourceSchemaName == null) {
- finalSchemaName = sinkSchemaName;
- } else {
- finalSchemaName =
- sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY,
sourceSchemaName);
- }
+ String finalSchemaName;
+ if (sinkSchemaName != null) {
+ if (sourceSchemaName == null) {
+ finalSchemaName = sinkSchemaName;
} else {
- finalSchemaName = null;
- }
- String finalTableName = sinkTableName;
- if (StringUtils.isNotEmpty(sourceTableName)) {
- finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY,
sourceTableName);
+ finalSchemaName =
sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName);
}
+ } else {
+ finalSchemaName = null;
+ }
+ String finalTableName = sinkTableName;
+ if (StringUtils.isNotEmpty(sourceTableName)) {
+ finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY,
sourceTableName);
+ }
- // rebuild TableIdentifier and catalogTable
- TableIdentifier newTableId =
- TableIdentifier.of(
- tableId.getCatalogName(),
- finalDatabaseName,
- finalSchemaName,
- finalTableName);
- catalogTable =
- CatalogTable.of(
- newTableId,
- catalogTable.getTableSchema(),
- catalogTable.getOptions(),
- catalogTable.getPartitionKeys(),
- catalogTable.getComment(),
- catalogTable.getCatalogName());
- Map<String, String> map = config.toMap();
- if (catalogTable.getTableId().getSchemaName() != null) {
- map.put(
- TABLE.key(),
- catalogTable.getTableId().getSchemaName()
- + "."
- + catalogTable.getTableId().getTableName());
- } else {
- map.put(TABLE.key(), catalogTable.getTableId().getTableName());
- }
- map.put(DATABASE.key(),
catalogTable.getTableId().getDatabaseName());
- PrimaryKey primaryKey =
catalogTable.getTableSchema().getPrimaryKey();
+ // rebuild TableIdentifier and catalogTable
+ TableIdentifier newTableId =
+ TableIdentifier.of(
+ tableId.getCatalogName(),
+ finalDatabaseName,
+ finalSchemaName,
+ finalTableName);
+ catalogTable =
+ CatalogTable.of(
+ newTableId,
+ catalogTable.getTableSchema(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment(),
+ catalogTable.getCatalogName());
+ Map<String, String> map = config.toMap();
+ if (catalogTable.getTableId().getSchemaName() != null) {
+ map.put(
+ TABLE.key(),
+ catalogTable.getTableId().getSchemaName()
+ + "."
+ + catalogTable.getTableId().getTableName());
+ } else {
+ map.put(TABLE.key(), catalogTable.getTableId().getTableName());
+ }
+ map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName());
+ PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+ if (!config.getOptional(PRIMARY_KEYS).isPresent()) {
if (primaryKey != null &&
!CollectionUtils.isEmpty(primaryKey.getColumnNames())) {
map.put(PRIMARY_KEYS.key(), String.join(",",
primaryKey.getColumnNames()));
} else {
@@ -202,12 +200,15 @@ public class JdbcSinkFactory implements TableSinkFactory {
.collect(Collectors.joining(",")));
}
}
- config = ReadonlyConfig.fromMap(new HashMap<>(map));
}
+ config = ReadonlyConfig.fromMap(new HashMap<>(map));
// always execute
final ReadonlyConfig options = config;
JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
+ catalogTable
+ .getOptions()
+ .put("fieldIde", fieldIdeEnum == null ? null :
fieldIdeEnum.getValue());
JdbcDialect dialect =
JdbcDialectLoader.load(
sinkConfig.getJdbcConnectionConfig().getUrl(),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 1216c69645..42d2340817 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -65,7 +65,7 @@ import static org.awaitility.Awaitility.await;
@Slf4j
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK},
+ type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK do not support cdc")
public class SqlServerCDCIT extends TestSuiteBase implements TestResource {