This is an automated email from the ASF dual-hosted git repository. zhouyao2023 pushed a commit to branch 2.3.4-release in repository https://gitbox.apache.org/repos/asf/seatunnel.git
commit 1d2b65bfc89d98f2de9ec5368625b2d18b4de4a5 Author: Eric <[email protected]> AuthorDate: Wed Jan 24 19:53:02 2024 +0800 Fix Jdbc sink target table name error (#6269) --- .../connectors/seatunnel/jdbc/sink/JdbcSink.java | 12 +- .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 161 +++++++++++---------- .../connector/cdc/sqlserver/SqlServerCDCIT.java | 2 +- 3 files changed, 90 insertions(+), 85 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index af95537bd2..599c0f5820 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -172,6 +172,10 @@ public class JdbcSink if (StringUtils.isBlank(jdbcSinkConfig.getTable())) { return Optional.empty(); } + // use query to write data can not support savemode + if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) { + return Optional.empty(); + } Optional<Catalog> catalogOptional = JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect); if (catalogOptional.isPresent()) { @@ -185,10 +189,10 @@ public class JdbcSink : fieldIdeEnumEnum.getValue(); TablePath tablePath = TablePath.of( - jdbcSinkConfig.getDatabase() - + "." - + CatalogUtils.quoteTableIdentifier( - jdbcSinkConfig.getTable(), fieldIde)); + catalogTable.getTableId().getDatabaseName(), + catalogTable.getTableId().getSchemaName(), + CatalogUtils.quoteTableIdentifier( + catalogTable.getTableId().getTableName(), fieldIde)); catalogTable.getOptions().put("fieldIde", fieldIde); return Optional.of( new DefaultSaveModeHandler( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index 4fe88e669f..eff6bb67c6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -99,91 +99,89 @@ public class JdbcSinkFactory implements TableSinkFactory { ReadonlyConfig catalogOptions = getCatalogOptions(context); Optional<String> optionalTable = config.getOptional(TABLE); Optional<String> optionalDatabase = config.getOptional(DATABASE); - Optional<String> queryOptional = config.getOptional(QUERY); - if (!optionalTable.isPresent() && !queryOptional.isPresent()) { + if (!optionalTable.isPresent()) { optionalTable = Optional.of(REPLACE_TABLE_NAME_KEY); - // get source table relevant information - TableIdentifier tableId = catalogTable.getTableId(); - String sourceDatabaseName = tableId.getDatabaseName(); - String sourceSchemaName = tableId.getSchemaName(); - String sourceTableName = tableId.getTableName(); - // get sink table relevant information - String sinkDatabaseName = optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY); - String sinkTableNameBefore = optionalTable.get(); - String[] sinkTableSplitArray = sinkTableNameBefore.split("\\."); - String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length - 1]; - String sinkSchemaName; - if (sinkTableSplitArray.length > 1) { - sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length - 2]; - } else { - sinkSchemaName = null; - } - if (StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) { - sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA); - } - // to add tablePrefix and tableSuffix - String tempTableName; - String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX); - String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX); - if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) { - tempTableName = - StringUtils.isNotEmpty(prefix) ? prefix + sinkTableName : sinkTableName; - tempTableName = - StringUtils.isNotEmpty(suffix) ? tempTableName + suffix : tempTableName; + } + // get source table relevant information + TableIdentifier tableId = catalogTable.getTableId(); + String sourceDatabaseName = tableId.getDatabaseName(); + String sourceSchemaName = tableId.getSchemaName(); + String sourceTableName = tableId.getTableName(); + // get sink table relevant information + String sinkDatabaseName = optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY); + String sinkTableNameBefore = optionalTable.get(); + String[] sinkTableSplitArray = sinkTableNameBefore.split("\\."); + String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length - 1]; + String sinkSchemaName; + if (sinkTableSplitArray.length > 1) { + sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length - 2]; + } else { + sinkSchemaName = null; + } + if (StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) { + sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA); + } + // to add tablePrefix and tableSuffix + String tempTableName; + String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX); + String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX); + if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) { + tempTableName = StringUtils.isNotEmpty(prefix) ? prefix + sinkTableName : sinkTableName; + tempTableName = StringUtils.isNotEmpty(suffix) ? tempTableName + suffix : tempTableName; - } else { - tempTableName = sinkTableName; - } - // to replace - String finalDatabaseName = sinkDatabaseName; - if (StringUtils.isNotEmpty(sourceDatabaseName)) { - finalDatabaseName = - sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName); - } + } else { + tempTableName = sinkTableName; + } + // to replace + String finalDatabaseName = sinkDatabaseName; + if (StringUtils.isNotEmpty(sourceDatabaseName)) { + finalDatabaseName = + sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName); + } - String finalSchemaName; - if (sinkSchemaName != null) { - if (sourceSchemaName == null) { - finalSchemaName = sinkSchemaName; - } else { - finalSchemaName = - sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName); - } + String finalSchemaName; + if (sinkSchemaName != null) { + if (sourceSchemaName == null) { + finalSchemaName = sinkSchemaName; } else { - finalSchemaName = null; - } - String finalTableName = sinkTableName; - if (StringUtils.isNotEmpty(sourceTableName)) { - finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, sourceTableName); + finalSchemaName = sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName); } + } else { + finalSchemaName = null; + } + String finalTableName = sinkTableName; + if (StringUtils.isNotEmpty(sourceTableName)) { + finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, sourceTableName); + } - // rebuild TableIdentifier and catalogTable - TableIdentifier newTableId = - TableIdentifier.of( - tableId.getCatalogName(), - finalDatabaseName, - finalSchemaName, - finalTableName); - catalogTable = - CatalogTable.of( - newTableId, - catalogTable.getTableSchema(), - catalogTable.getOptions(), - catalogTable.getPartitionKeys(), - catalogTable.getComment(), - catalogTable.getCatalogName()); - Map<String, String> map = config.toMap(); - if (catalogTable.getTableId().getSchemaName() != null) { - map.put( - TABLE.key(), - catalogTable.getTableId().getSchemaName() - + "." - + catalogTable.getTableId().getTableName()); - } else { - map.put(TABLE.key(), catalogTable.getTableId().getTableName()); - } - map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName()); - PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); + // rebuild TableIdentifier and catalogTable + TableIdentifier newTableId = + TableIdentifier.of( + tableId.getCatalogName(), + finalDatabaseName, + finalSchemaName, + finalTableName); + catalogTable = + CatalogTable.of( + newTableId, + catalogTable.getTableSchema(), + catalogTable.getOptions(), + catalogTable.getPartitionKeys(), + catalogTable.getComment(), + catalogTable.getCatalogName()); + Map<String, String> map = config.toMap(); + if (catalogTable.getTableId().getSchemaName() != null) { + map.put( + TABLE.key(), + catalogTable.getTableId().getSchemaName() + + "." + + catalogTable.getTableId().getTableName()); + } else { + map.put(TABLE.key(), catalogTable.getTableId().getTableName()); + } + map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName()); + PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); + if (!config.getOptional(PRIMARY_KEYS).isPresent()) { if (primaryKey != null && !CollectionUtils.isEmpty(primaryKey.getColumnNames())) { map.put(PRIMARY_KEYS.key(), String.join(",", primaryKey.getColumnNames())); } else { @@ -202,12 +200,15 @@ public class JdbcSinkFactory implements TableSinkFactory { .collect(Collectors.joining(","))); } } - config = ReadonlyConfig.fromMap(new HashMap<>(map)); } + config = ReadonlyConfig.fromMap(new HashMap<>(map)); // always execute final ReadonlyConfig options = config; JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config); FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE); + catalogTable + .getOptions() + .put("fieldIde", fieldIdeEnum == null ? null : fieldIdeEnum.getValue()); JdbcDialect dialect = JdbcDialectLoader.load( sinkConfig.getJdbcConnectionConfig().getUrl(), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java index 1216c69645..42d2340817 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java @@ -65,7 +65,7 @@ import static org.awaitility.Awaitility.await; @Slf4j @DisabledOnContainer( value = {}, - type = {EngineType.SPARK}, + type = {EngineType.SPARK, EngineType.FLINK}, disabledReason = "Currently SPARK do not support cdc") public class SqlServerCDCIT extends TestSuiteBase implements TestResource {
