This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d8d289b682 [Test][e2e] Jdbc test checking data consistency. (#5734)
d8d289b682 is described below
commit d8d289b6822b88e8fe4f49931710114cfed6c982
Author: MoSence <[email protected]>
AuthorDate: Wed Nov 1 19:09:38 2023 +0800
[Test][e2e] Jdbc test checking data consistency. (#5734)
---
.../connectors/seatunnel/jdbc/AbstractJdbcIT.java | 100 +++++++++-
.../connectors/seatunnel/jdbc/JdbcITErrorCode.java | 1 +
.../connectors/seatunnel/jdbc/JdbcDb2IT.java | 8 +-
.../connectors/seatunnel/jdbc/JdbcMysqlIT.java | 216 +++++++++++++++------
.../connectors/seatunnel/jdbc/JdbcOracleIT.java | 2 +-
...mysql_source_and_sink_parallel_upper_lower.conf | 4 +-
.../seatunnel/jdbc/JdbcOceanBaseITBase.java | 2 +-
.../connectors/seatunnel/jdbc/JdbcPhoenixIT.java | 2 +-
.../seatunnel/jdbc/JdbcStarRocksdbIT.java | 2 +-
.../connectors/seatunnel/jdbc/JdbcKingbaseIT.java | 4 +-
.../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 2 +-
.../connectors/seatunnel/jdbc/JdbcVerticaIT.java | 2 +-
.../connectors/seatunnel/jdbc/JdbcDmIT.java | 2 +-
.../connectors/seatunnel/jdbc/JdbcDmUpsetIT.java | 2 +-
.../connectors/seatunnel/jdbc/JdbcGBase8aIT.java | 2 +-
.../connectors/seatunnel/jdbc/JdbcGreenplumIT.java | 2 +-
.../seatunnel/jdbc/JdbcOracleLowercaseTableIT.java | 2 +-
.../seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java | 2 +-
18 files changed, 274 insertions(+), 83 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 3db2e36829..f61381d24b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.shade.com.google.common.io.ByteStreams;
+import org.apache.seatunnel.shade.com.google.common.io.CharStreams;
+
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -37,23 +40,31 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.images.PullPolicy;
import org.testcontainers.lifecycle.Startables;
import com.github.dockerjava.api.model.Image;
-import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@@ -63,9 +74,10 @@ import java.util.stream.Stream;
import static org.awaitility.Awaitility.given;
-@Slf4j
public abstract class AbstractJdbcIT extends TestSuiteBase implements
TestResource {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
protected static final String HOST = "HOST";
@TestContainerExtension
@@ -88,7 +100,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
abstract JdbcCase getJdbcCase();
- abstract void compareResult() throws SQLException, IOException;
+ abstract void compareResult(String executeKey) throws SQLException,
IOException;
abstract String driverUrl();
@@ -319,12 +331,14 @@ public abstract class AbstractJdbcIT extends
TestSuiteBase implements TestResour
throws IOException, InterruptedException, SQLException {
List<String> configFiles = jdbcCase.getConfigFile();
for (String configFile : configFiles) {
- Container.ExecResult execResult = container.executeJob(configFile);
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ try {
+ Container.ExecResult execResult =
container.executeJob(configFile);
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ compareResult(String.format("%s in [%s]", configFile,
container.identifier()));
+ } finally {
+ clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
+ }
}
-
- compareResult();
- clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
}
protected void initCatalog() {}
@@ -363,4 +377,74 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
Assertions.assertFalse(catalog.databaseExists(targetTablePath.getDatabaseName()));
}
}
+
+ protected Object[] toArrayResult(ResultSet resultSet, String[] fieldNames)
+ throws SQLException, IOException {
+ List<Object> result = new ArrayList<>(0);
+ while (resultSet.next()) {
+ Object[] rowArray = new Object[fieldNames.length];
+ for (int colIndex = 0; colIndex < fieldNames.length; colIndex++) {
+ rowArray[colIndex] =
checkData(resultSet.getObject(fieldNames[colIndex]));
+ }
+ result.add(rowArray);
+ }
+ return result.toArray();
+ }
+
+ private Object checkData(Object data) throws SQLException, IOException {
+ if (data == null) {
+ return null;
+ } else if (data instanceof byte[]) {
+ return data;
+ } else if (data instanceof Clob) {
+ try (Reader reader = ((Clob) data).getCharacterStream()) {
+ return CharStreams.toString(reader);
+ }
+ } else if (data instanceof Blob) {
+ try (InputStream inputStream = ((Blob) data).getBinaryStream()) {
+ return ByteStreams.toByteArray(inputStream);
+ }
+ } else if (data instanceof Array) {
+ Object[] jdbcArray = (Object[]) ((Array) data).getArray();
+ Object[] javaArray = new Object[jdbcArray.length];
+ for (int index = 0; index < jdbcArray.length; index++) {
+ javaArray[index] = checkData(jdbcArray[index]);
+ }
+ return javaArray;
+ } else {
+ return data;
+ }
+ }
+
+ protected void defaultCompare(String executeKey, String[] fieldNames,
String sortKey) {
+ try (Statement statement = connection.createStatement()) {
+ ResultSet source =
+ statement.executeQuery(
+ String.format(
+ "SELECT * FROM %s ORDER BY %s",
+ buildTableInfoWithSchema(
+ this.jdbcCase.getSchema(),
+ this.jdbcCase.getSourceTable()),
+ quoteIdentifier(sortKey)));
+ Object[] sourceResult = toArrayResult(source, fieldNames);
+ ResultSet sink =
+ statement.executeQuery(
+ String.format(
+ "SELECT * FROM %s ORDER BY %s",
+ buildTableInfoWithSchema(
+ this.jdbcCase.getSchema(),
+ this.jdbcCase.getSinkTable()),
+ quoteIdentifier(sortKey)));
+ Object[] sinkResult = toArrayResult(sink, fieldNames);
+ log.warn(
+ "{}: source data count {}, sink data count {}.",
+ executeKey,
+ sourceResult.length,
+ sinkResult.length);
+ Assertions.assertArrayEquals(
+ sourceResult, sinkResult, String.format("[%s] data
compare", executeKey));
+ } catch (SQLException | IOException e) {
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.DATA_COMPARISON_FAILED, e);
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java
index 965fb7ba85..ab101677b0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java
@@ -25,6 +25,7 @@ public enum JdbcITErrorCode implements SeaTunnelErrorCode {
CREATE_TABLE_FAILED("JDBC-IT-02", "Fail to create table."),
INSERT_DATA_FAILED("JDBC-IT-03", "Fail to inert data."),
DRIVER_NOT_FOUND("JDBC-IT-04", "Can not get the driver."),
+ DATA_COMPARISON_FAILED("JDBC-IT-05", "Source data is inconsistent with
target data."),
;
private final String code;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
index f258faa75f..a482b1790b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
@@ -29,7 +29,6 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerLoggerFactory;
import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.sql.Date;
@@ -41,7 +40,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-@Slf4j
public class JdbcDb2IT extends AbstractJdbcIT {
private static final String DB2_CONTAINER_HOST = "db2-e2e";
@@ -120,7 +118,7 @@ public class JdbcDb2IT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
@@ -205,7 +203,9 @@ public class JdbcDb2IT extends AbstractJdbcIT {
@Override
public void clearTable(String schema, String table) {
try (Statement statement = connection.createStatement()) {
- String truncate = String.format("delete from \"%s\".%s where
1=1;", schema, table);
+ String truncate =
+ String.format(
+ "delete from %s where 1=1;",
buildTableInfoWithSchema(schema, table));
statement.execute(truncate);
connection.commit();
} catch (SQLException e) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index 75c2b9324f..e2d2dcb56e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
@@ -53,7 +55,6 @@ import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
-import com.google.common.collect.Lists;
import com.mysql.cj.jdbc.ConnectionImpl;
import java.io.IOException;
@@ -179,7 +180,57 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ protected void compareResult(String executeKey) {
+ String[] fieldNames =
+ new String[] {
+ "c_bit_1",
+ "c_bit_8",
+ "c_bit_16",
+ "c_bit_32",
+ "c_bit_64",
+ "c_boolean",
+ "c_tinyint",
+ "c_tinyint_unsigned",
+ "c_smallint",
+ "c_smallint_unsigned",
+ "c_mediumint",
+ "c_mediumint_unsigned",
+ "c_int",
+ "c_integer",
+ "c_year",
+ "c_int_unsigned",
+ "c_integer_unsigned",
+ "c_bigint",
+ "c_bigint_unsigned",
+ "c_decimal",
+ "c_decimal_unsigned",
+ "c_float",
+ "c_float_unsigned",
+ "c_double",
+ "c_double_unsigned",
+ "c_char",
+ "c_tinytext",
+ "c_mediumtext",
+ "c_text",
+ "c_varchar",
+ "c_json",
+ "c_longtext",
+ "c_date",
+ "c_datetime",
+ "c_time",
+ "c_timestamp",
+ "c_tinyblob",
+ "c_mediumblob",
+ "c_blob",
+ "c_longblob",
+ "c_varbinary",
+ "c_binary",
+ "c_bigint_30",
+ "c_decimal_unsigned_30",
+ "c_decimal_30",
+ };
+ defaultCompare(executeKey, fieldNames, "c_bigint_30");
+ }
@Override
String driverUrl() {
@@ -242,58 +293,115 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
BigDecimal decimalValue = new
BigDecimal("999999999999999999999999999899");
for (int i = 0; i < 100; i++) {
byte byteArr = Integer.valueOf(i).byteValue();
- SeaTunnelRow row =
- new SeaTunnelRow(
- new Object[] {
- i % 2 == 0 ? (byte) 1 : (byte) 0,
- new byte[] {byteArr},
- new byte[] {byteArr, byteArr},
- new byte[] {byteArr, byteArr, byteArr,
byteArr},
- new byte[] {
- byteArr, byteArr, byteArr, byteArr,
byteArr, byteArr, byteArr,
- byteArr
- },
- i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE,
- i,
- i,
- i,
- i,
- i,
- i,
- i,
- i,
- i,
- Long.parseLong("1"),
- Long.parseLong("1"),
- Long.parseLong("1"),
- BigDecimal.valueOf(i, 0),
- BigDecimal.valueOf(i, 18),
- BigDecimal.valueOf(i, 18),
- Float.parseFloat("1.1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- Double.parseDouble("1.1"),
- "f",
- String.format("f1_%s", i),
- String.format("f1_%s", i),
- String.format("f1_%s", i),
- String.format("f1_%s", i),
- String.format("{\"aa\":\"bb_%s\"}", i),
- String.format("f1_%s", i),
- Date.valueOf(LocalDate.now()),
- Timestamp.valueOf(LocalDateTime.now()),
- Time.valueOf(LocalTime.now()),
- new Timestamp(System.currentTimeMillis()),
- "test".getBytes(),
- "test".getBytes(),
- "test".getBytes(),
- "test".getBytes(),
- "test".getBytes(),
- "f".getBytes(),
- bigintValue.add(BigDecimal.valueOf(i)),
- decimalValue.add(BigDecimal.valueOf(i)),
- decimalValue.add(BigDecimal.valueOf(i)),
- });
+ SeaTunnelRow row;
+ if (i == 99) {
+ row =
+ new SeaTunnelRow(
+ new Object[] {
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ //
https://github.com/apache/seatunnel/issues/5559 this value
+ // cannot set null, this null
+ // value column's row will be lost in
+ //
jdbc_mysql_source_and_sink_parallel.conf,jdbc_mysql_source_and_sink_parallel_upper_lower.conf.
+ bigintValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
+ null,
+ });
+ } else {
+ row =
+ new SeaTunnelRow(
+ new Object[] {
+ i % 2 == 0 ? (byte) 1 : (byte) 0,
+ new byte[] {byteArr},
+ new byte[] {byteArr, byteArr},
+ new byte[] {byteArr, byteArr, byteArr,
byteArr},
+ new byte[] {
+ byteArr, byteArr, byteArr, byteArr,
byteArr, byteArr,
+ byteArr, byteArr
+ },
+ i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ Long.parseLong("1"),
+ Long.parseLong("1"),
+ Long.parseLong("1"),
+ BigDecimal.valueOf(i, 0),
+ BigDecimal.valueOf(i, 18),
+ BigDecimal.valueOf(i, 18),
+ Float.parseFloat("1.1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ Double.parseDouble("1.1"),
+ "f",
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("{\"aa\":\"bb_%s\"}", i),
+ String.format("f1_%s", i),
+ Date.valueOf(LocalDate.now()),
+ Timestamp.valueOf(LocalDateTime.now()),
+ Time.valueOf(LocalTime.now()),
+ new Timestamp(System.currentTimeMillis()),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "f".getBytes(),
+ bigintValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
+ });
+ }
rows.add(row);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index 75bdffbd6c..e98da6a1ab 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -116,7 +116,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index 1b092f1e91..213ba00899 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -29,8 +29,8 @@ source {
query = "select * from source"
partition_column = "c_bigint_30"
result_table_name = "jdbc"
- partition_lower_bound = 2844674407371055160
- partition_upper_bound = 2844674407371055259
+ partition_lower_bound = 2844674407371055000
+ partition_upper_bound = 2844674407371055099
partition_num = 5
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
index 50177ef1a8..6cdc38780a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -46,7 +46,7 @@ public abstract class JdbcOceanBaseITBase extends
AbstractJdbcIT {
abstract String getFullTableName(String tableName);
@Override
- void compareResult() {
+ void compareResult(String executeKey) {
String sourceSql =
String.format("select * from %s order by 1",
getFullTableName(OCEANBASE_SOURCE));
String sinkSql =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPhoenixIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPhoenixIT.java
index 34283f20d4..73ffef9947 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPhoenixIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPhoenixIT.java
@@ -122,7 +122,7 @@ public class JdbcPhoenixIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
index 93d9a289ab..e7fc94e642 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
@@ -108,7 +108,7 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
index 17d53bb87d..9d662e619d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
@@ -28,7 +28,6 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerLoggerFactory;
import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.math.BigDecimal;
@@ -51,7 +50,6 @@ import java.util.stream.Collectors;
* engine does not support the TIME type.Two environment variables need to be
added to the spark
* container: "LANG"="C.UTF-8", "JAVA_TOOL_OPTIONS"="-Dfile.encoding=UTF8"
*/
-@Slf4j
@Disabled("Due to copyright reasons, you need to download the trial version km
license yourself")
public class JdbcKingbaseIT extends AbstractJdbcIT {
private static final String KINGBASE_IMAGE = "huzhihui/kingbase:v8r6";
@@ -124,7 +122,7 @@ public class JdbcKingbaseIT extends AbstractJdbcIT {
}
@Override
- void compareResult() throws SQLException, IOException {}
+ void compareResult(String executeKey) throws SQLException, IOException {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index 0a170ff4be..e56fa37573 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -105,7 +105,7 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
}
@Override
- void compareResult() throws SQLException, IOException {}
+ void compareResult(String executeKey) throws SQLException, IOException {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
index df87df5163..1ff6032737 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
@@ -91,7 +91,7 @@ public class JdbcVerticaIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
index 8b3ebe135e..f2b9097ffa 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
@@ -131,7 +131,7 @@ public class JdbcDmIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
index 7b9d52c1d5..d501f6a965 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
@@ -156,7 +156,7 @@ public class JdbcDmUpsetIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
protected void createNeededTables() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
index b5a5e83e46..388cf67ae9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
@@ -110,7 +110,7 @@ public class JdbcGBase8aIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
index c1846c999c..9c98c29a7a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
@@ -88,7 +88,7 @@ public class JdbcGreenplumIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
index b90387ec6d..378b76367c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
@@ -116,7 +116,7 @@ public class JdbcOracleLowercaseTableIT extends
AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
index f9a0343d0d..053b8c9332 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
@@ -154,7 +154,7 @@ public class JdbcMysqlSaveModeHandlerIT extends
AbstractJdbcIT {
}
@Override
- void compareResult() {
+ void compareResult(String executeKey) {
final TablePath tablePathSource = TablePath.of("seatunnel", "source");
final CatalogTable tableSource = catalog.getTable(tablePathSource);
final List<Column> columnsSource =
tableSource.getTableSchema().getColumns();