This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new ec368902f [Improve][Jdbc-sink] add database field to sink config 
(#4199)
ec368902f is described below

commit ec368902f4dce48fa26c1e5af2b16eda96e56404
Author: ic4y <[email protected]>
AuthorDate: Mon Feb 27 10:03:34 2023 +0800

    [Improve][Jdbc-sink] add database field to sink config (#4199)
    
    * [Improve][Jdbc-sink] add database field to sink config
---
 docs/en/connector-v2/sink/Jdbc.md                  | 12 +++++--
 release-note.md                                    |  1 +
 .../seatunnel/jdbc/config/JdbcConfig.java          |  4 +++
 .../seatunnel/jdbc/config/JdbcSinkOptions.java     |  4 ++-
 .../jdbc/internal/JdbcOutputFormatBuilder.java     | 37 +++++++++++++---------
 .../jdbc/internal/dialect/JdbcDialect.java         | 26 +++++++--------
 .../jdbc/internal/dialect/db2/DB2Dialect.java      |  2 +-
 .../jdbc/internal/dialect/dm/DmdbDialect.java      |  2 +-
 .../internal/dialect/gbase8a/Gbase8aDialect.java   |  2 +-
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |  4 +--
 .../internal/dialect/oracle/OracleDialect.java     |  5 +--
 .../internal/dialect/phoenix/PhoenixDialect.java   |  2 +-
 .../internal/dialect/psql/PostgresDialect.java     |  4 +--
 .../internal/dialect/redshift/RedshiftDialect.java |  2 +-
 .../internal/dialect/saphana/SapHanaDialect.java   |  2 +-
 .../internal/dialect/sqlite/SqliteDialect.java     |  4 +--
 .../dialect/sqlserver/SqlServerDialect.java        |  5 +--
 .../dialect/tablestore/TablestoreDialect.java      |  2 +-
 .../internal/dialect/teradata/TeradataDialect.java |  2 +-
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |  6 +++-
 .../resources/jdbc_sink_auto_generate_sql.conf     |  3 +-
 .../jdbc_sink_auto_generate_upsql_sql.conf         |  4 +--
 .../test/resources/jdbc_sink_cdc_changelog.conf    |  4 +--
 .../engine/server/master/JobMetricsTest.java       |  8 ++---
 24 files changed, 88 insertions(+), 59 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index 6fcd13dd5..edff1f3f5 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -33,6 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 | user                                      | String  | No       | -           
  |
 | password                                  | String  | No       | -           
  |
 | query                                     | String  | No       | -           
  |
+| database                                  | String  | No       | -           
  |
 | table                                     | String  | No       | -           
  |
 | primary_keys                              | Array   | No       | -           
  |
 | support_upsert_by_query_primary_key_exist | Boolean | No       | false       
  |
@@ -67,9 +68,15 @@ The URL of the JDBC connection. Refer to a case: 
jdbc:postgresql://localhost/tes
 
 Use this sql write upstream input datas to database. e.g `INSERT ...`
 
+### database [string]
+
+Use this `database` and `table-name` auto-generate sql and receive upstream 
input datas write to database.
+
+This option is mutually exclusive with `query` and has a higher priority.
+
 ### table [string]
 
-Use this `table-name` auto-generate sql and receive upstream input datas write 
to database.
+Use `database` and this `table-name` auto-generate sql and receive upstream 
input datas write to database.
 
 This option is mutually exclusive with `query` and has a higher priority.
 
@@ -228,4 +235,5 @@ sink {
 - [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3378](https://github.com/apache/incubator-seatunnel/issues/3378))
 - [Feature] Support Doris JDBC Sink
 - [Feature] Support Redshift JDBC 
Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
-- [Improve] Add config item enable upsert by 
query([#3708](https://github.com/apache/incubator-seatunnel/pull/3708))
\ No newline at end of file
+- [Improve] Add config item enable upsert by 
query([#3708](https://github.com/apache/incubator-seatunnel/pull/3708))
+- [Improve] Add database field to sink 
config([#4199](https://github.com/apache/incubator-seatunnel/pull/4199))
\ No newline at end of file
diff --git a/release-note.md b/release-note.md
index f72c8ee69..fa33d6121 100644
--- a/release-note.md
+++ b/release-note.md
@@ -34,6 +34,7 @@
 - [API]Add parallelism and column projection interface #3829
 - [API]Add get source method to all source connector #3846
 - [Hive] Support read user-defined partitions #3842
+- [Jdbc] Add database field to sink config #4199
 ### Zeta Engine
 - [Chore] Remove unnecessary dependencies #3795
 - [Core] Improve job restart of all node down #3784
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
index 72efa6fc8..95a50ec60 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
@@ -56,12 +56,16 @@ public class JdbcConfig implements Serializable {
 
     public static final Option<Boolean> IS_EXACTLY_ONCE = 
Options.key("is_exactly_once").booleanType().defaultValue(true).withDescription("exactly
 once");
 
+    public static final Option<Boolean> GENERATE_SINK_SQL = 
Options.key("generate_sink_sql").booleanType().defaultValue(true).withDescription("generate
 sql using the database table");
+
     public static final Option<String> XA_DATA_SOURCE_CLASS_NAME = 
Options.key("xa_data_source_class_name").stringType().noDefaultValue().withDescription("data
 source class name");
 
     public static final Option<Integer> MAX_COMMIT_ATTEMPTS = 
Options.key("max_commit_attempts").intType().defaultValue(3).withDescription("max
 commit attempts");
 
     public static final Option<Integer> TRANSACTION_TIMEOUT_SEC = 
Options.key("transaction_timeout_sec").intType().defaultValue(-1).withDescription("transaction
 timeout (second)");
 
+    public static final Option<String> DATABASE = 
Options.key("database").stringType().noDefaultValue().withDescription("database");
+
     public static final Option<String> TABLE = 
Options.key("table").stringType().noDefaultValue().withDescription("table");
 
     public static final Option<List<String>> PRIMARY_KEYS = 
Options.key("primary_keys").listType().noDefaultValue().withDescription("primary
 keys");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
index 0aeb9205d..4df746297 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
@@ -35,6 +35,7 @@ public class JdbcSinkOptions implements Serializable {
     private JdbcConnectionOptions jdbcConnectionOptions;
     private boolean isExactlyOnce;
     public String simpleSQL;
+    private String database;
     private String table;
     private List<String> primaryKeys;
     private boolean supportUpsertByQueryPrimaryKeyExist;
@@ -45,7 +46,8 @@ public class JdbcSinkOptions implements Serializable {
             this.isExactlyOnce = true;
         }
 
-        if (config.hasPath(JdbcConfig.TABLE.key())) {
+        if (config.hasPath(JdbcConfig.TABLE.key()) && 
config.hasPath(JdbcConfig.DATABASE.key())) {
+            this.database = config.getString(JdbcConfig.DATABASE.key());
             this.table = config.getString(JdbcConfig.TABLE.key());
             if (config.hasPath(JdbcConfig.PRIMARY_KEYS.key())) {
                 this.primaryKeys = 
config.getStringList(JdbcConfig.PRIMARY_KEYS.key());
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 56d725dc4..fa57ef358 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -56,17 +56,18 @@ public class JdbcOutputFormatBuilder {
     public JdbcOutputFormat build() {
         JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory;
 
+        final String database = jdbcSinkOptions.getDatabase();
         final String table = jdbcSinkOptions.getTable();
         final List<String> primaryKeys = jdbcSinkOptions.getPrimaryKeys();
-        if (Strings.isNullOrEmpty(table)) {
+        if (Strings.isNullOrEmpty(table) && Strings.isNullOrEmpty(database)) {
             statementExecutorFactory = () -> createSimpleBufferedExecutor(
                 jdbcSinkOptions.getSimpleSQL(), seaTunnelRowType, 
dialect.getRowConverter());
         } else if (primaryKeys == null || primaryKeys.isEmpty()) {
             statementExecutorFactory = () -> createSimpleBufferedExecutor(
-                dialect, table, seaTunnelRowType);
+                dialect, database, table, seaTunnelRowType);
         } else {
             statementExecutorFactory = () -> createUpsertBufferedExecutor(
-                dialect, table, seaTunnelRowType,
+                dialect, database, table, seaTunnelRowType,
                 primaryKeys.toArray(new String[0]),
                 jdbcSinkOptions.isSupportUpsertByQueryPrimaryKeyExist());
         }
@@ -76,9 +77,10 @@ public class JdbcOutputFormatBuilder {
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createSimpleBufferedExecutor(JdbcDialect dialect,
+                                                                               
          String database,
                                                                                
          String table,
                                                                                
          SeaTunnelRowType rowType) {
-        String insertSQL = dialect.getInsertIntoStatement(table, 
rowType.getFieldNames());
+        String insertSQL = dialect.getInsertIntoStatement(database, table, 
rowType.getFieldNames());
         return createSimpleBufferedExecutor(insertSQL, rowType, 
dialect.getRowConverter());
     }
 
@@ -91,6 +93,7 @@ public class JdbcOutputFormatBuilder {
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createUpsertBufferedExecutor(JdbcDialect dialect,
+                                                                               
          String database,
                                                                                
          String table,
                                                                                
          SeaTunnelRowType rowType,
                                                                                
          String[] pkNames,
@@ -104,32 +107,34 @@ public class JdbcOutputFormatBuilder {
 
         Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = 
createKeyExtractor(pkFields);
         JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor = 
createDeleteExecutor(
-            dialect, table, pkNames, pkTypes);
+            dialect, database, table, pkNames, pkTypes);
         JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor = 
createUpsertExecutor(
-            dialect, table, rowType, pkNames, pkTypes, keyExtractor, 
supportUpsertByQueryPrimaryKeyExist);
+            dialect, database, table, rowType, pkNames, pkTypes, keyExtractor, 
supportUpsertByQueryPrimaryKeyExist);
         return new BufferReducedBatchStatementExecutor(
             upsertExecutor, deleteExecutor, keyExtractor, Function.identity());
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createUpsertExecutor(JdbcDialect dialect,
+                                                                               
  String database,
                                                                                
  String table,
                                                                                
  SeaTunnelRowType rowType,
                                                                                
  String[] pkNames,
                                                                                
  SeaTunnelDataType[] pkTypes,
                                                                                
  Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
                                                                                
  boolean supportUpsertByQueryPrimaryKeyExist) {
-        return dialect.getUpsertStatement(table, rowType.getFieldNames(), 
pkNames)
+        return dialect.getUpsertStatement(database, table, 
rowType.getFieldNames(), pkNames)
             .map(upsertSQL -> createSimpleExecutor(upsertSQL, rowType, 
dialect.getRowConverter()))
             .orElseGet(() -> {
                 if (supportUpsertByQueryPrimaryKeyExist) {
                     return createInsertOrUpdateByQueryExecutor(
-                        dialect, table, rowType, pkNames, pkTypes, 
keyExtractor);
+                        dialect, database, table, rowType, pkNames, pkTypes, 
keyExtractor);
                 }
-                return createInsertOrUpdateExecutor(dialect, table, rowType, 
pkNames);
+                return createInsertOrUpdateExecutor(dialect, database, table, 
rowType, pkNames);
             });
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createInsertOrUpdateExecutor(JdbcDialect dialect,
+                                                                               
          String database,
                                                                                
          String table,
                                                                                
          SeaTunnelRowType rowType,
                                                                                
          String[] pkNames) {
@@ -137,17 +142,18 @@ public class JdbcOutputFormatBuilder {
         return new InsertOrUpdateBatchStatementExecutor(
             connection -> FieldNamedPreparedStatement.prepareStatement(
                 connection,
-                dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
+                dialect.getInsertIntoStatement(database, table, 
rowType.getFieldNames()),
                 rowType.getFieldNames()),
             connection -> FieldNamedPreparedStatement.prepareStatement(
                 connection,
-                dialect.getUpdateStatement(table, rowType.getFieldNames(), 
pkNames),
+                dialect.getUpdateStatement(database, table, 
rowType.getFieldNames(), pkNames),
                 rowType.getFieldNames()),
             rowType,
             dialect.getRowConverter());
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createInsertOrUpdateByQueryExecutor(JdbcDialect dialect,
+                                                                               
                 String database,
                                                                                
                 String table,
                                                                                
                 SeaTunnelRowType rowType,
                                                                                
                 String[] pkNames,
@@ -157,15 +163,15 @@ public class JdbcOutputFormatBuilder {
         return new InsertOrUpdateBatchStatementExecutor(
             connection -> FieldNamedPreparedStatement.prepareStatement(
                 connection,
-                dialect.getRowExistsStatement(table, pkNames),
+                dialect.getRowExistsStatement(database, table, pkNames),
                 pkNames),
             connection -> FieldNamedPreparedStatement.prepareStatement(
                 connection,
-                dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
+                dialect.getInsertIntoStatement(database, table, 
rowType.getFieldNames()),
                 rowType.getFieldNames()),
             connection -> FieldNamedPreparedStatement.prepareStatement(
                 connection,
-                dialect.getUpdateStatement(table, rowType.getFieldNames(), 
pkNames),
+                dialect.getUpdateStatement(database, table, 
rowType.getFieldNames(), pkNames),
                 rowType.getFieldNames()),
             keyRowType,
             keyExtractor,
@@ -174,10 +180,11 @@ public class JdbcOutputFormatBuilder {
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createDeleteExecutor(JdbcDialect dialect,
+                                                                               
  String database,
                                                                                
  String table,
                                                                                
  String[] pkNames,
                                                                                
  SeaTunnelDataType[] pkTypes) {
-        String deleteSQL = dialect.getDeleteStatement(table, pkNames);
+        String deleteSQL = dialect.getDeleteStatement(database, table, 
pkNames);
         return createSimpleExecutor(deleteSQL, pkNames, pkTypes, 
dialect.getRowConverter());
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 2d52dbe2a..e14191e3d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -80,15 +80,15 @@ public interface JdbcDialect extends Serializable {
      *
      * @return the dialects {@code INSERT INTO} statement.
      */
-    default String getInsertIntoStatement(String tableName, String[] 
fieldNames) {
+    default String getInsertIntoStatement(String database, String tableName, 
String[] fieldNames) {
         String columns = Arrays.stream(fieldNames)
             .map(this::quoteIdentifier)
             .collect(Collectors.joining(", "));
         String placeholders = Arrays.stream(fieldNames)
             .map(fieldName -> ":" + fieldName)
             .collect(Collectors.joining(", "));
-        return String.format("INSERT INTO %s (%s) VALUES (%s)",
-            quoteIdentifier(tableName), columns, placeholders);
+        return String.format("INSERT INTO %s.%s (%s) VALUES (%s)",
+            quoteIdentifier(database), quoteIdentifier(tableName), columns, 
placeholders);
     }
 
     /**
@@ -102,15 +102,15 @@ public interface JdbcDialect extends Serializable {
      *
      * @return the dialects {@code UPDATE} statement.
      */
-    default String getUpdateStatement(String tableName, String[] fieldNames, 
String[] conditionFields) {
+    default String getUpdateStatement(String database, String tableName, 
String[] fieldNames, String[] conditionFields) {
         String setClause = Arrays.stream(fieldNames)
             .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), 
fieldName))
             .collect(Collectors.joining(", "));
         String conditionClause = Arrays.stream(conditionFields)
             .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), 
fieldName))
             .collect(Collectors.joining(" AND "));
-        return String.format("UPDATE %s SET %s WHERE %s",
-            quoteIdentifier(tableName), setClause, conditionClause);
+        return String.format("UPDATE %s.%s SET %s WHERE %s",
+            quoteIdentifier(database), quoteIdentifier(tableName), setClause, 
conditionClause);
     }
 
     /**
@@ -124,12 +124,12 @@ public interface JdbcDialect extends Serializable {
      *
      * @return the dialects {@code DELETE} statement.
      */
-    default String getDeleteStatement(String tableName, String[] 
conditionFields) {
+    default String getDeleteStatement(String database, String tableName, 
String[] conditionFields) {
         String conditionClause = Arrays.stream(conditionFields)
             .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), 
fieldName))
             .collect(Collectors.joining(" AND "));
-        return String.format("DELETE FROM %s WHERE %s",
-            quoteIdentifier(tableName), conditionClause);
+        return String.format("DELETE FROM %s.%s WHERE %s",
+            quoteIdentifier(database), quoteIdentifier(tableName), 
conditionClause);
     }
 
     /**
@@ -142,12 +142,12 @@ public interface JdbcDialect extends Serializable {
      *
      * @return the dialects {@code QUERY} statement.
      */
-    default String getRowExistsStatement(String tableName, String[] 
conditionFields) {
+    default String getRowExistsStatement(String database, String tableName, 
String[] conditionFields) {
         String fieldExpressions = Arrays.stream(conditionFields)
             .map(field -> format("%s = :%s", quoteIdentifier(field), field))
             .collect(Collectors.joining(" AND "));
-        return String.format("SELECT 1 FROM %s WHERE %s",
-            quoteIdentifier(tableName), fieldExpressions);
+        return String.format("SELECT 1 FROM %s.%s WHERE %s",
+            quoteIdentifier(database), quoteIdentifier(tableName), 
fieldExpressions);
     }
 
     /**
@@ -163,7 +163,7 @@ public interface JdbcDialect extends Serializable {
      * @return the dialects {@code UPSERT} statement or {@link 
Optional#empty()}.
      *
      */
-    Optional<String> getUpsertStatement(String tableName, String[] fieldNames, 
String[] uniqueKeyFields);
+    Optional<String> getUpsertStatement(String database, String tableName, 
String[] fieldNames, String[] uniqueKeyFields);
 
     /**
      * Different dialects optimize their PreparedStatement
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
index e5ccd1860..e2e3f8a79 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
@@ -41,7 +41,7 @@ public class DB2Dialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         return Optional.empty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
index 3f7b374a5..4c3e9005f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
@@ -41,7 +41,7 @@ public class DmdbDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         return Optional.empty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
index 1df71f2a3..cb0125e6c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
@@ -40,7 +40,7 @@ public class Gbase8aDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         return Optional.empty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 8b1d3d391..33dd9141d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -51,11 +51,11 @@ public class MysqlDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         String updateClause = Arrays.stream(fieldNames)
             .map(fieldName -> quoteIdentifier(fieldName) + "=VALUES(" + 
quoteIdentifier(fieldName) + ")")
             .collect(Collectors.joining(", "));
-        String upsertSQL = getInsertIntoStatement(tableName, fieldNames) + " 
ON DUPLICATE KEY UPDATE " + updateClause;
+        String upsertSQL = getInsertIntoStatement(database, tableName, 
fieldNames) + " ON DUPLICATE KEY UPDATE " + updateClause;
         return Optional.of(upsertSQL);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index cadf4bd16..44a37b793 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -50,7 +50,7 @@ public class OracleDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         List<String> nonUniqueKeyFields = Arrays.stream(fieldNames)
             .filter(fieldName -> 
!Arrays.asList(uniqueKeyFields).contains(fieldName))
             .collect(Collectors.toList());
@@ -75,13 +75,14 @@ public class OracleDialect implements JdbcDialect {
             .collect(Collectors.joining(", "));
 
         String upsertSQL = String.format(
-            " MERGE INTO %s TARGET"
+            " MERGE INTO %s.%s TARGET"
                 + " USING (%s) SOURCE"
                 + " ON (%s) "
                 + " WHEN MATCHED THEN"
                 + " UPDATE SET %s"
                 + " WHEN NOT MATCHED THEN"
                 + " INSERT (%s) VALUES (%s)",
+            database,
             tableName,
             usingClause,
             onConditions,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialect.java
index 6f78d54cd..dfd8e8028 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialect.java
@@ -40,7 +40,7 @@ public class PhoenixDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         return Optional.empty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 6b3b7d873..9cc0ea379 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -49,7 +49,7 @@ public class PostgresDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         String uniqueColumns = Arrays.stream(uniqueKeyFields)
             .map(this::quoteIdentifier)
             .collect(Collectors.joining(", "));
@@ -57,7 +57,7 @@ public class PostgresDialect implements JdbcDialect {
             .map(fieldName -> quoteIdentifier(fieldName) + "=EXCLUDED." + 
quoteIdentifier(fieldName))
             .collect(Collectors.joining(", "));
         String upsertSQL = String.format("%s ON CONFLICT (%s) DO UPDATE SET 
%s",
-            getInsertIntoStatement(tableName, fieldNames), uniqueColumns, 
updateClause);
+            getInsertIntoStatement(database, tableName, fieldNames), 
uniqueColumns, updateClause);
         return Optional.of(upsertSQL);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
index 01c44c4ec..574be9d41 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialect.java
@@ -40,7 +40,7 @@ public class RedshiftDialect  implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         return Optional.empty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialect.java
index 5505e1918..2cd2c078f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialect.java
@@ -41,7 +41,7 @@ public class SapHanaDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         return Optional.empty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
index bf361dc8c..270c8ac6c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
@@ -47,7 +47,7 @@ public class SqliteDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         String updateClause = Arrays.stream(fieldNames)
                 .map(fieldName -> quoteIdentifier(fieldName) + "=VALUES(" + 
quoteIdentifier(fieldName) + ")")
                 .collect(Collectors.joining(", "));
@@ -56,7 +56,7 @@ public class SqliteDialect implements JdbcDialect {
                 .map(this::quoteIdentifier)
                 .collect(Collectors.joining(","));
 
-        String upsertSQL = getInsertIntoStatement(tableName, fieldNames) + " 
ON CONFLICT(" + conflictFields + ") DO UPDATE SET " + updateClause;
+        String upsertSQL = getInsertIntoStatement(database, tableName, 
fieldNames) + " ON CONFLICT(" + conflictFields + ") DO UPDATE SET " + 
updateClause;
         return Optional.of(upsertSQL);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 0ac734dc0..5eb34619f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -43,7 +43,7 @@ public class SqlServerDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         List<String> nonUniqueKeyFields = Arrays.stream(fieldNames)
             .filter(fieldName -> 
!Arrays.asList(uniqueKeyFields).contains(fieldName))
             .collect(Collectors.toList());
@@ -67,13 +67,14 @@ public class SqlServerDialect implements JdbcDialect {
             .map(fieldName -> "[SOURCE]." + quoteIdentifier(fieldName))
             .collect(Collectors.joining(", "));
         String upsertSQL = String.format(
-            "MERGE INTO %s AS [TARGET]"
+            "MERGE INTO %s.%s AS [TARGET]"
                 + " USING (%s) AS [SOURCE]"
                 + " ON (%s)"
                 + " WHEN MATCHED THEN"
                 + " UPDATE SET %s"
                 + " WHEN NOT MATCHED THEN"
                 + " INSERT (%s) VALUES (%s);",
+            database,
             tableName,
             usingClause,
             onConditions,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
index 6d44bfb60..de93d9e00 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
@@ -45,7 +45,7 @@ public class TablestoreDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         return Optional.empty();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java
index d824d2ef0..3a2c7da83 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java
@@ -41,7 +41,7 @@ public class TeradataDialect implements JdbcDialect {
     }
 
     @Override
-    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+    public Optional<String> getUpsertStatement(String database, String 
tableName, String[] fieldNames, String[] uniqueKeyFields) {
         return Optional.empty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index a609dc1c7..fa26ca3d2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -21,7 +21,9 @@ import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.A
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.BATCH_INTERVAL_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.BATCH_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.DRIVER;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.GENERATE_SINK_SQL;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.IS_EXACTLY_ONCE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.MAX_COMMIT_ATTEMPTS;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.MAX_RETRIES;
@@ -52,17 +54,19 @@ public class JdbcSinkFactory implements TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
             .required(URL, DRIVER)
-            .exclusive(QUERY, TABLE)
             .optional(USER,
                 PASSWORD,
                 CONNECTION_CHECK_TIMEOUT_SEC,
                 BATCH_SIZE,
                 BATCH_INTERVAL_MS,
                 IS_EXACTLY_ONCE,
+                GENERATE_SINK_SQL,
                 AUTO_COMMIT,
                 SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST)
             .conditional(IS_EXACTLY_ONCE, true, XA_DATA_SOURCE_CLASS_NAME, 
MAX_COMMIT_ATTEMPTS, TRANSACTION_TIMEOUT_SEC)
             .conditional(IS_EXACTLY_ONCE, false, MAX_RETRIES)
+            .conditional(GENERATE_SINK_SQL, true, DATABASE, TABLE)
+            .conditional(GENERATE_SINK_SQL, false, QUERY)
             .conditional(SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST, true, 
PRIMARY_KEYS)
             .build();
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_sql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_sql.conf
index 188ebf524..33ec82209 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_sql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_sql.conf
@@ -54,6 +54,7 @@ sink {
         user = test
         password = test
 
-        table = sink
+        database = test
+        table = "public.sink"
     }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_upsql_sql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_upsql_sql.conf
index 68eb1ba3b..3e5808c1d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_upsql_sql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_auto_generate_upsql_sql.conf
@@ -53,8 +53,8 @@ sink {
         url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
         user = test
         password = test
-
-        table = sink
+        database = test
+        table = "public.sink"
         primary_keys = ["user_id"]
     }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
index 188c7bf5d..2d31ec4ab 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
@@ -64,8 +64,8 @@ sink {
         url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
         user = test
         password = test
-
-        table = sink
+        database = test
+        table = "public.sink"
         primary_keys = ["pk_id"]
     }
 }
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 1d66ab6e5..7121edbb3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -111,10 +111,10 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
         //check metrics
         JobMetrics jobMetrics = coordinatorService.getJobMetrics(JOB_3);
         System.out.println(jobMetrics.toJsonString());
-        assertTrue(80 <  (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
-        assertTrue(80 <  (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
-        assertTrue(80 <  (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
-        assertTrue(80 <  (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(1).value());
+        assertTrue(40 <  (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
+        assertTrue(40 <  (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
+        assertTrue(40 <  (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
+        assertTrue(40 <  (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(1).value());
     }
 
     private void startJob(Long jobid, String path, boolean 
isStartWithSavePoint){


Reply via email to