This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new ec368902f [Improve][Jdbc-sink] add database field to sink config
(#4199)
ec368902f is described below
commit ec368902f4dce48fa26c1e5af2b16eda96e56404
Author: ic4y <[email protected]>
AuthorDate: Mon Feb 27 10:03:34 2023 +0800
[Improve][Jdbc-sink] add database field to sink config (#4199)
* [Improve][Jdbc-sink] add database field to sink config
---
docs/en/connector-v2/sink/Jdbc.md | 12 +++++--
release-note.md | 1 +
.../seatunnel/jdbc/config/JdbcConfig.java | 4 +++
.../seatunnel/jdbc/config/JdbcSinkOptions.java | 4 ++-
.../jdbc/internal/JdbcOutputFormatBuilder.java | 37 +++++++++++++---------
.../jdbc/internal/dialect/JdbcDialect.java | 26 +++++++--------
.../jdbc/internal/dialect/db2/DB2Dialect.java | 2 +-
.../jdbc/internal/dialect/dm/DmdbDialect.java | 2 +-
.../internal/dialect/gbase8a/Gbase8aDialect.java | 2 +-
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 4 +--
.../internal/dialect/oracle/OracleDialect.java | 5 +--
.../internal/dialect/phoenix/PhoenixDialect.java | 2 +-
.../internal/dialect/psql/PostgresDialect.java | 4 +--
.../internal/dialect/redshift/RedshiftDialect.java | 2 +-
.../internal/dialect/saphana/SapHanaDialect.java | 2 +-
.../internal/dialect/sqlite/SqliteDialect.java | 4 +--
.../dialect/sqlserver/SqlServerDialect.java | 5 +--
.../dialect/tablestore/TablestoreDialect.java | 2 +-
.../internal/dialect/teradata/TeradataDialect.java | 2 +-
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 6 +++-
.../resources/jdbc_sink_auto_generate_sql.conf | 3 +-
.../jdbc_sink_auto_generate_upsql_sql.conf | 4 +--
.../test/resources/jdbc_sink_cdc_changelog.conf | 4 +--
.../engine/server/master/JobMetricsTest.java | 8 ++---
24 files changed, 88 insertions(+), 59 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index 6fcd13dd5..edff1f3f5 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -33,6 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true`
to enable it.
| user | String | No | -
|
| password | String | No | -
|
| query | String | No | -
|
+| database | String | No | -
|
| table | String | No | -
|
| primary_keys | Array | No | -
|
| support_upsert_by_query_primary_key_exist | Boolean | No | false
|
@@ -67,9 +68,15 @@ The URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost/tes
Use this sql write upstream input datas to database. e.g `INSERT ...`
+### database [string]
+
+Use this `database` and `table-name` auto-generate sql and receive upstream
input datas write to database.
+
+This option is mutually exclusive with `query` and has a higher priority.
+
### table [string]
-Use this `table-name` auto-generate sql and receive upstream input datas write
to database.
+Use `database` and this `table-name` auto-generate sql and receive upstream
input datas write to database.
This option is mutually exclusive with `query` and has a higher priority.
@@ -228,4 +235,5 @@ sink {
- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3378](https://github.com/apache/incubator-seatunnel/issues/3378))
- [Feature] Support Doris JDBC Sink
- [Feature] Support Redshift JDBC
Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
-- [Improve] Add config item enable upsert by
query([#3708](https://github.com/apache/incubator-seatunnel/pull/3708))
\ No newline at end of file
+- [Improve] Add config item enable upsert by
query([#3708](https://github.com/apache/incubator-seatunnel/pull/3708))
+- [Improve] Add database field to sink
config([#4199](https://github.com/apache/incubator-seatunnel/pull/4199))
\ No newline at end of file
diff --git a/release-note.md b/release-note.md
index f72c8ee69..fa33d6121 100644
--- a/release-note.md
+++ b/release-note.md
@@ -34,6 +34,7 @@
- [API]Add parallelism and column projection interface #3829
- [API]Add get source method to all source connector #3846
- [Hive] Support read user-defined partitions #3842
+- [Jdbc] Add database field to sink config #4199
### Zeta Engine
- [Chore] Remove unnecessary dependencies #3795
- [Core] Improve job restart of all node down #3784
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
index 72efa6fc8..95a50ec60 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
@@ -56,12 +56,16 @@ public class JdbcConfig implements Serializable {
public static final Option<Boolean> IS_EXACTLY_ONCE =
Options.key("is_exactly_once").booleanType().defaultValue(true).withDescription("exactly
once");
+ public static final Option<Boolean> GENERATE_SINK_SQL =
Options.key("generate_sink_sql").booleanType().defaultValue(true).withDescription("generate
sql using the database table");
+
public static final Option<String> XA_DATA_SOURCE_CLASS_NAME =
Options.key("xa_data_source_class_name").stringType().noDefaultValue().withDescription("data
source class name");
public static final Option<Integer> MAX_COMMIT_ATTEMPTS =
Options.key("max_commit_attempts").intType().defaultValue(3).withDescription("max
commit attempts");
public static final Option<Integer> TRANSACTION_TIMEOUT_SEC =
Options.key("transaction_timeout_sec").intType().defaultValue(-1).withDescription("transaction
timeout (second)");
+ public static final Option<String> DATABASE =
Options.key("database").stringType().noDefaultValue().withDescription("database");
+
public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription("table");
public static final Option<List<String>> PRIMARY_KEYS =
Options.key("primary_keys").listType().noDefaultValue().withDescription("primary
keys");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
index 0aeb9205d..4df746297 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
@@ -35,6 +35,7 @@ public class JdbcSinkOptions implements Serializable {
private JdbcConnectionOptions jdbcConnectionOptions;
private boolean isExactlyOnce;
public String simpleSQL;
+ private String database;
private String table;
private List<String> primaryKeys;
private boolean supportUpsertByQueryPrimaryKeyExist;
@@ -45,7 +46,8 @@ public class JdbcSinkOptions implements Serializable {
this.isExactlyOnce = true;
}
- if (config.hasPath(JdbcConfig.TABLE.key())) {
+ if (config.hasPath(JdbcConfig.TABLE.key()) &&
config.hasPath(JdbcConfig.DATABASE.key())) {
+ this.database = config.getString(JdbcConfig.DATABASE.key());
this.table = config.getString(JdbcConfig.TABLE.key());
if (config.hasPath(JdbcConfig.PRIMARY_KEYS.key())) {
this.primaryKeys =
config.getStringList(JdbcConfig.PRIMARY_KEYS.key());
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 56d725dc4..fa57ef358 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -56,17 +56,18 @@ public class JdbcOutputFormatBuilder {
public JdbcOutputFormat build() {
JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory;
+ final String database = jdbcSinkOptions.getDatabase();
final String table = jdbcSinkOptions.getTable();
final List<String> primaryKeys = jdbcSinkOptions.getPrimaryKeys();
- if (Strings.isNullOrEmpty(table)) {
+ if (Strings.isNullOrEmpty(table) && Strings.isNullOrEmpty(database)) {
statementExecutorFactory = () -> createSimpleBufferedExecutor(
jdbcSinkOptions.getSimpleSQL(), seaTunnelRowType,
dialect.getRowConverter());
} else if (primaryKeys == null || primaryKeys.isEmpty()) {
statementExecutorFactory = () -> createSimpleBufferedExecutor(
- dialect, table, seaTunnelRowType);
+ dialect, database, table, seaTunnelRowType);
} else {
statementExecutorFactory = () -> createUpsertBufferedExecutor(
- dialect, table, seaTunnelRowType,
+ dialect, database, table, seaTunnelRowType,
primaryKeys.toArray(new String[0]),
jdbcSinkOptions.isSupportUpsertByQueryPrimaryKeyExist());
}
@@ -76,9 +77,10 @@ public class JdbcOutputFormatBuilder {
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createSimpleBufferedExecutor(JdbcDialect dialect,
+
String database,
String table,
SeaTunnelRowType rowType) {
- String insertSQL = dialect.getInsertIntoStatement(table,
rowType.getFieldNames());
+ String insertSQL = dialect.getInsertIntoStatement(database, table,
rowType.getFieldNames());
return createSimpleBufferedExecutor(insertSQL, rowType,
dialect.getRowConverter());
}
@@ -91,6 +93,7 @@ public class JdbcOutputFormatBuilder {
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createUpsertBufferedExecutor(JdbcDialect dialect,
+
String database,
String table,
SeaTunnelRowType rowType,
String[] pkNames,
@@ -104,32 +107,34 @@ public class JdbcOutputFormatBuilder {
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor =
createKeyExtractor(pkFields);
JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor =
createDeleteExecutor(
- dialect, table, pkNames, pkTypes);
+ dialect, database, table, pkNames, pkTypes);
JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor =
createUpsertExecutor(
- dialect, table, rowType, pkNames, pkTypes, keyExtractor,
supportUpsertByQueryPrimaryKeyExist);
+ dialect, database, table, rowType, pkNames, pkTypes, keyExtractor,
supportUpsertByQueryPrimaryKeyExist);
return new BufferReducedBatchStatementExecutor(
upsertExecutor, deleteExecutor, keyExtractor, Function.identity());
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createUpsertExecutor(JdbcDialect dialect,
+
String database,
String table,
SeaTunnelRowType rowType,
String[] pkNames,
SeaTunnelDataType[] pkTypes,
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
boolean supportUpsertByQueryPrimaryKeyExist) {
- return dialect.getUpsertStatement(table, rowType.getFieldNames(),
pkNames)
+ return dialect.getUpsertStatement(database, table,
rowType.getFieldNames(), pkNames)
.map(upsertSQL -> createSimpleExecutor(upsertSQL, rowType,
dialect.getRowConverter()))
.orElseGet(() -> {
if (supportUpsertByQueryPrimaryKeyExist) {
return createInsertOrUpdateByQueryExecutor(
- dialect, table, rowType, pkNames, pkTypes,
keyExtractor);
+ dialect, database, table, rowType, pkNames, pkTypes,
keyExtractor);
}
- return createInsertOrUpdateExecutor(dialect, table, rowType,
pkNames);
+ return createInsertOrUpdateExecutor(dialect, database, table,
rowType, pkNames);
});
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createInsertOrUpdateExecutor(JdbcDialect dialect,
+
String database,
String table,
SeaTunnelRowType rowType,
String[] pkNames) {
@@ -137,17 +142,18 @@ public class JdbcOutputFormatBuilder {
return new InsertOrUpdateBatchStatementExecutor(
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
- dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
+ dialect.getInsertIntoStatement(database, table,
rowType.getFieldNames()),
rowType.getFieldNames()),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
- dialect.getUpdateStatement(table, rowType.getFieldNames(),
pkNames),
+ dialect.getUpdateStatement(database, table,
rowType.getFieldNames(), pkNames),
rowType.getFieldNames()),
rowType,
dialect.getRowConverter());
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createInsertOrUpdateByQueryExecutor(JdbcDialect dialect,
+
String database,
String table,
SeaTunnelRowType rowType,
String[] pkNames,
@@ -157,15 +163,15 @@ public class JdbcOutputFormatBuilder {
return new InsertOrUpdateBatchStatementExecutor(
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
- dialect.getRowExistsStatement(table, pkNames),
+ dialect.getRowExistsStatement(database, table, pkNames),
pkNames),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
- dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
+ dialect.getInsertIntoStatement(database, table,
rowType.getFieldNames()),
rowType.getFieldNames()),
connection -> FieldNamedPreparedStatement.prepareStatement(
connection,
- dialect.getUpdateStatement(table, rowType.getFieldNames(),
pkNames),
+ dialect.getUpdateStatement(database, table,
rowType.getFieldNames(), pkNames),
rowType.getFieldNames()),
keyRowType,
keyExtractor,
@@ -174,10 +180,11 @@ public class JdbcOutputFormatBuilder {
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createDeleteExecutor(JdbcDialect dialect,
+
String database,
String table,
String[] pkNames,
SeaTunnelDataType[] pkTypes) {
- String deleteSQL = dialect.getDeleteStatement(table, pkNames);
+ String deleteSQL = dialect.getDeleteStatement(database, table,
pkNames);
return createSimpleExecutor(deleteSQL, pkNames, pkTypes,
dialect.getRowConverter());
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 2d52dbe2a..e14191e3d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -80,15 +80,15 @@ public interface JdbcDialect extends Serializable {
*
* @return the dialects {@code INSERT INTO} statement.
*/
- default String getInsertIntoStatement(String tableName, String[]
fieldNames) {
+ default String getInsertIntoStatement(String database, String tableName,
String[] fieldNames) {
String columns = Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
.map(fieldName -> ":" + fieldName)
.collect(Collectors.joining(", "));
- return String.format("INSERT INTO %s (%s) VALUES (%s)",
- quoteIdentifier(tableName), columns, placeholders);
+ return String.format("INSERT INTO %s.%s (%s) VALUES (%s)",
+ quoteIdentifier(database), quoteIdentifier(tableName), columns,
placeholders);
}
/**
@@ -102,15 +102,15 @@ public interface JdbcDialect extends Serializable {
*
* @return the dialects {@code UPDATE} statement.
*/
- default String getUpdateStatement(String tableName, String[] fieldNames,
String[] conditionFields) {
+ default String getUpdateStatement(String database, String tableName,
String[] fieldNames, String[] conditionFields) {
String setClause = Arrays.stream(fieldNames)
.map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName),
fieldName))
.collect(Collectors.joining(", "));
String conditionClause = Arrays.stream(conditionFields)
.map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName),
fieldName))
.collect(Collectors.joining(" AND "));
- return String.format("UPDATE %s SET %s WHERE %s",
- quoteIdentifier(tableName), setClause, conditionClause);
+ return String.format("UPDATE %s.%s SET %s WHERE %s",
+ quoteIdentifier(database), quoteIdentifier(tableName), setClause,
conditionClause);
}
/**
@@ -124,12 +124,12 @@ public interface JdbcDialect extends Serializable {
*
* @return the dialects {@code DELETE} statement.
*/
- default String getDeleteStatement(String tableName, String[]
conditionFields) {
+ default String getDeleteStatement(String database, String tableName,
String[] conditionFields) {
String conditionClause = Arrays.stream(conditionFields)
.map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName),
fieldName))
.collect(Collectors.joining(" AND "));
- return String.format("DELETE FROM %s WHERE %s",
- quoteIdentifier(tableName), conditionClause);
+ return String.format("DELETE FROM %s.%s WHERE %s",
+ quoteIdentifier(database), quoteIdentifier(tableName),
conditionClause);
}
/**
@@ -142,12 +142,12 @@ public interface JdbcDialect extends Serializable {
*
* @return the dialects {@code QUERY} statement.
*/
- default String getRowExistsStatement(String tableName, String[]
conditionFields) {
+ default String getRowExistsStatement(String database, String tableName,
String[] conditionFields) {
String fieldExpressions = Arrays.stream(conditionFields)
.map(field -> format("%s = :%s", quoteIdentifier(field), field))
.collect(Collectors.joining(" AND "));
- return String.format("SELECT 1 FROM %s WHERE %s",
- quoteIdentifier(tableName), fieldExpressions);
+ return String.format("SELECT 1 FROM %s.%s WHERE %s",
+ quoteIdentifier(database), quoteIdentifier(tableName),
fieldExpressions);
}
/**
@@ -163,7 +163,7 @@ public interface JdbcDialect extends Serializable {
* @return the dialects {@code UPSERT} statement or {@link
Optional#empty()}.
*
*/
- Optional<String> getUpsertStatement(String tableName, String[] fieldNames,
String[] uniqueKeyFields);
+ Optional<String> getUpsertStatement(String database, String tableName,
String[] fieldNames, String[] uniqueKeyFields);
/**
* Different dialects optimize their PreparedStatement
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
index e5ccd1860..e2e3f8a79 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
@@ -41,7 +41,7 @@ public class DB2Dialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
index 3f7b374a5..4c3e9005f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
@@ -41,7 +41,7 @@ public class DmdbDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
index 1df71f2a3..cb0125e6c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
@@ -40,7 +40,7 @@ public class Gbase8aDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 8b1d3d391..33dd9141d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -51,11 +51,11 @@ public class MysqlDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
String updateClause = Arrays.stream(fieldNames)
.map(fieldName -> quoteIdentifier(fieldName) + "=VALUES(" +
quoteIdentifier(fieldName) + ")")
.collect(Collectors.joining(", "));
- String upsertSQL = getInsertIntoStatement(tableName, fieldNames) + "
ON DUPLICATE KEY UPDATE " + updateClause;
+ String upsertSQL = getInsertIntoStatement(database, tableName,
fieldNames) + " ON DUPLICATE KEY UPDATE " + updateClause;
return Optional.of(upsertSQL);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index cadf4bd16..44a37b793 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -50,7 +50,7 @@ public class OracleDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
List<String> nonUniqueKeyFields = Arrays.stream(fieldNames)
.filter(fieldName ->
!Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
@@ -75,13 +75,14 @@ public class OracleDialect implements JdbcDialect {
.collect(Collectors.joining(", "));
String upsertSQL = String.format(
- " MERGE INTO %s TARGET"
+ " MERGE INTO %s.%s TARGET"
+ " USING (%s) SOURCE"
+ " ON (%s) "
+ " WHEN MATCHED THEN"
+ " UPDATE SET %s"
+ " WHEN NOT MATCHED THEN"
+ " INSERT (%s) VALUES (%s)",
+ database,
tableName,
usingClause,
onConditions,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialect.java
index 6f78d54cd..dfd8e8028 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialect.java
@@ -40,7 +40,7 @@ public class PhoenixDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 6b3b7d873..9cc0ea379 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -49,7 +49,7 @@ public class PostgresDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
String uniqueColumns = Arrays.stream(uniqueKeyFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
@@ -57,7 +57,7 @@ public class PostgresDialect implements JdbcDialect {
.map(fieldName -> quoteIdentifier(fieldName) + "=EXCLUDED." +
quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));
String upsertSQL = String.format("%s ON CONFLICT (%s) DO UPDATE SET
%s",
- getInsertIntoStatement(tableName, fieldNames), uniqueColumns,
updateClause);
+ getInsertIntoStatement(database, tableName, fieldNames),
uniqueColumns, updateClause);
return Optional.of(upsertSQL);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
index 01c44c4ec..574be9d41 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
@@ -40,7 +40,7 @@ public class RedshiftDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialect.java
index 5505e1918..2cd2c078f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialect.java
@@ -41,7 +41,7 @@ public class SapHanaDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
index bf361dc8c..270c8ac6c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
@@ -47,7 +47,7 @@ public class SqliteDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
String updateClause = Arrays.stream(fieldNames)
.map(fieldName -> quoteIdentifier(fieldName) + "=VALUES(" +
quoteIdentifier(fieldName) + ")")
.collect(Collectors.joining(", "));
@@ -56,7 +56,7 @@ public class SqliteDialect implements JdbcDialect {
.map(this::quoteIdentifier)
.collect(Collectors.joining(","));
- String upsertSQL = getInsertIntoStatement(tableName, fieldNames) + "
ON CONFLICT(" + conflictFields + ") DO UPDATE SET " + updateClause;
+ String upsertSQL = getInsertIntoStatement(database, tableName,
fieldNames) + " ON CONFLICT(" + conflictFields + ") DO UPDATE SET " +
updateClause;
return Optional.of(upsertSQL);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 0ac734dc0..5eb34619f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -43,7 +43,7 @@ public class SqlServerDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
List<String> nonUniqueKeyFields = Arrays.stream(fieldNames)
.filter(fieldName ->
!Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
@@ -67,13 +67,14 @@ public class SqlServerDialect implements JdbcDialect {
.map(fieldName -> "[SOURCE]." + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));
String upsertSQL = String.format(
- "MERGE INTO %s AS [TARGET]"
+ "MERGE INTO %s.%s AS [TARGET]"
+ " USING (%s) AS [SOURCE]"
+ " ON (%s)"
+ " WHEN MATCHED THEN"
+ " UPDATE SET %s"
+ " WHEN NOT MATCHED THEN"
+ " INSERT (%s) VALUES (%s);",
+ database,
tableName,
usingClause,
onConditions,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
index 6d44bfb60..de93d9e00 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
@@ -45,7 +45,7 @@ public class TablestoreDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java
index d824d2ef0..3a2c7da83 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java
@@ -41,7 +41,7 @@ public class TeradataDialect implements JdbcDialect {
}
@Override
- public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields) {
+ public Optional<String> getUpsertStatement(String database, String
tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index a609dc1c7..fa26ca3d2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -21,7 +21,9 @@ import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.A
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.BATCH_INTERVAL_MS;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.BATCH_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.DRIVER;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.GENERATE_SINK_SQL;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.IS_EXACTLY_ONCE;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.MAX_COMMIT_ATTEMPTS;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.MAX_RETRIES;
@@ -52,17 +54,19 @@ public class JdbcSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(URL, DRIVER)
- .exclusive(QUERY, TABLE)
.optional(USER,
PASSWORD,
CONNECTION_CHECK_TIMEOUT_SEC,
BATCH_SIZE,
BATCH_INTERVAL_MS,
IS_EXACTLY_ONCE,
+ GENERATE_SINK_SQL,
AUTO_COMMIT,
SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST)
.conditional(IS_EXACTLY_ONCE, true, XA_DATA_SOURCE_CLASS_NAME,
MAX_COMMIT_ATTEMPTS, TRANSACTION_TIMEOUT_SEC)
.conditional(IS_EXACTLY_ONCE, false, MAX_RETRIES)
+ .conditional(GENERATE_SINK_SQL, true, DATABASE, TABLE)
+ .conditional(GENERATE_SINK_SQL, false, QUERY)
.conditional(SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST, true,
PRIMARY_KEYS)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_sql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_sql.conf
index 188ebf524..33ec82209 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_sql.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_sql.conf
@@ -54,6 +54,7 @@ sink {
user = test
password = test
- table = sink
+ database = test
+ table = "public.sink"
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_upsql_sql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_upsql_sql.conf
index 68eb1ba3b..3e5808c1d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_upsql_sql.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_upsql_sql.conf
@@ -53,8 +53,8 @@ sink {
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
-
- table = sink
+ database = test
+ table = "public.sink"
primary_keys = ["user_id"]
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
index 188c7bf5d..2d31ec4ab 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
@@ -64,8 +64,8 @@ sink {
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
-
- table = sink
+ database = test
+ table = "public.sink"
primary_keys = ["pk_id"]
}
}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 1d66ab6e5..7121edbb3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -111,10 +111,10 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
//check metrics
JobMetrics jobMetrics = coordinatorService.getJobMetrics(JOB_3);
System.out.println(jobMetrics.toJsonString());
- assertTrue(80 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
- assertTrue(80 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
- assertTrue(80 < (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
- assertTrue(80 < (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(1).value());
+ assertTrue(40 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
+ assertTrue(40 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
+ assertTrue(40 < (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
+ assertTrue(40 < (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(1).value());
}
private void startJob(Long jobid, String path, boolean
isStartWithSavePoint){