This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ee3b7c3723 [Improve][JDBC Source] Fix Split can not be cancel (#6825)
ee3b7c3723 is described below
commit ee3b7c37239f6f696fe6bb8f43bd7078465ff837
Author: Eric <[email protected]>
AuthorDate: Tue May 14 10:19:11 2024 +0800
[Improve][JDBC Source] Fix Split can not be cancel (#6825)
---
.../enumerator/splitter/AbstractJdbcSourceChunkSplitter.java | 2 +-
.../base/source/enumerator/splitter/JdbcSourceChunkSplitter.java | 4 ++--
.../src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java | 2 +-
.../enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java | 2 +-
.../seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java | 2 +-
.../connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java | 5 ++++-
.../cdc/oracle/source/eumerator/OracleChunkSplitter.java | 2 +-
.../connectors/seatunnel/cdc/oracle/utils/OracleUtils.java | 5 ++++-
.../cdc/postgres/source/enumerator/PostgresChunkSplitter.java | 4 ++--
.../connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java | 5 ++++-
.../sqlserver/source/source/eumerator/SqlServerChunkSplitter.java | 2 +-
.../seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java | 5 ++++-
.../connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java | 5 ++++-
.../seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java | 5 ++++-
.../seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java | 4 ++--
.../connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java | 8 ++++----
.../apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java | 3 +--
.../apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java | 2 +-
.../seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java | 2 +-
19 files changed, 43 insertions(+), 26 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index e10c70795b..60a208de86 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -114,7 +114,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
}
private List<ChunkRange> splitTableIntoChunks(
- JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
+ JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
Exception {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
index 3981ddfa7c..d64469d3b3 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -81,7 +81,7 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
@Deprecated
Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
samplingRate)
- throws SQLException;
+ throws Exception;
/**
* Performs a sampling operation on the specified column of a table in a
JDBC-connected
@@ -97,7 +97,7 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
*/
default Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int
samplingRate)
- throws SQLException {
+ throws Exception {
return sampleDataFromColumn(jdbc, tableId, column.name(),
samplingRate);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
index 32617fe18c..d7bf573cc1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
@@ -83,7 +83,7 @@ public class JdbcSourceChunkSplitterTest {
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
samplingRate)
- throws SQLException {
+ throws Exception {
return new Object[0];
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
index 6f646eb6be..f89e720c43 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
@@ -195,7 +195,7 @@ public class AbstractJdbcSourceChunkSplitterTest {
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
samplingRate)
- throws SQLException {
+ throws Exception {
return new Object[0];
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index b4982f2cbe..732b21e395 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -57,7 +57,7 @@ public class MySqlChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
- throws SQLException {
+ throws Exception {
return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName,
inverseSamplingRate);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 9b06ddda96..777be9d1d6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -146,7 +146,7 @@ public class MySqlUtils {
public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
- throws SQLException {
+ throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName),
quote(tableId));
@@ -172,6 +172,9 @@ public class MySqlUtils {
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
}
} finally {
if (rs != null) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
index 52df70cbc8..6525c3a2db 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
@@ -61,7 +61,7 @@ public class OracleChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
- throws SQLException {
+ throws Exception {
return OracleUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
index 8d67c0f141..cad2a3c836 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
@@ -150,7 +150,7 @@ public class OracleUtils {
public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
- throws SQLException {
+ throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName),
quoteSchemaAndTable(tableId));
@@ -176,6 +176,9 @@ public class OracleUtils {
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
}
} finally {
if (rs != null) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
index 2aab573d2e..fb1aec572d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
@@ -70,7 +70,7 @@ public class PostgresChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
- throws SQLException {
+ throws Exception {
return PostgresUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, null, inverseSamplingRate);
}
@@ -78,7 +78,7 @@ public class PostgresChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int
inverseSamplingRate)
- throws SQLException {
+ throws Exception {
return PostgresUtils.skipReadAndSortSampleData(
jdbc, tableId, column.name(), column, inverseSamplingRate);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
index b5cd090453..09ea768aa2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
@@ -161,7 +161,7 @@ public class PostgresUtils {
String columnName,
Column column,
int inverseSamplingRate)
- throws SQLException {
+ throws Exception {
columnName = quote(columnName);
if (column != null) {
columnName = JDBC_DIALECT.convertType(columnName,
column.typeName());
@@ -187,6 +187,9 @@ public class PostgresUtils {
if (count % 100000 == 0) {
log.info("Processing row index: {}", count);
}
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index b6698f5319..b59bb7789d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -57,7 +57,7 @@ public class SqlServerChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
- throws SQLException {
+ throws Exception {
return SqlServerUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index db1872fa64..11c2822d15 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -151,7 +151,7 @@ public class SqlServerUtils {
public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
- throws SQLException {
+ throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName),
quote(tableId));
@@ -177,6 +177,9 @@ public class SqlServerUtils {
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
}
} finally {
if (rs != null) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index db9b90dade..da92f82109 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -312,7 +312,7 @@ public interface JdbcDialect extends Serializable {
String columnName,
int samplingRate,
int fetchSize)
- throws SQLException {
+ throws Exception {
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
@@ -337,6 +337,9 @@ public interface JdbcDialect extends Serializable {
if (count % samplingRate == 0) {
results.add(rs.getObject(1));
}
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 5527417e91..03067f6d5e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -131,7 +131,7 @@ public class MysqlDialect implements JdbcDialect {
String columnName,
int samplingRate,
int fetchSize)
- throws SQLException {
+ throws Exception {
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
@@ -158,6 +158,9 @@ public class MysqlDialect implements JdbcDialect {
if (count % samplingRate == 0) {
results.add(rs.getObject(1));
}
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index 198dfe47cb..f4da0a8d94 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -85,7 +85,7 @@ public abstract class ChunkSplitter implements AutoCloseable,
Serializable {
}
}
- public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table)
throws SQLException {
+ public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table)
throws Exception {
log.info("Start splitting table {} into chunks...",
table.getTablePath());
long start = System.currentTimeMillis();
@@ -111,7 +111,7 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
}
protected abstract Collection<JdbcSourceSplit> createSplits(
- JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws
SQLException;
+ JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws
SQLException, Exception;
public PreparedStatement generateSplitStatement(JdbcSourceSplit split,
TableSchema schema)
throws SQLException {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index 9dc26d1ef2..d958c405df 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -62,7 +62,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
@Override
protected Collection<JdbcSourceSplit> createSplits(
- JdbcSourceTable table, SeaTunnelRowType splitKey) throws
SQLException {
+ JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception
{
return createDynamicSplits(table, splitKey);
}
@@ -73,7 +73,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
}
private Collection<JdbcSourceSplit> createDynamicSplits(
- JdbcSourceTable table, SeaTunnelRowType splitKey) throws
SQLException {
+ JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception
{
String splitKeyName = splitKey.getFieldNames()[0];
SeaTunnelDataType splitKeyType = splitKey.getFieldType(0);
List<ChunkRange> chunks = splitTableIntoChunks(table, splitKeyName,
splitKeyType);
@@ -105,7 +105,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
private List<ChunkRange> splitTableIntoChunks(
JdbcSourceTable table, String splitColumnName, SeaTunnelDataType
splitColumnType)
- throws SQLException {
+ throws Exception {
Pair<Object, Object> minMax = queryMinMax(table, splitColumnName);
Object min = minMax.getLeft();
Object max = minMax.getRight();
@@ -136,7 +136,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
private List<ChunkRange> evenlyColumnSplitChunks(
JdbcSourceTable table, String splitColumnName, Object min, Object
max, int chunkSize)
- throws SQLException {
+ throws Exception {
TablePath tablePath = table.getTablePath();
double distributionFactorUpper =
config.getSplitEvenDistributionFactorUpperBound();
double distributionFactorLower =
config.getSplitEvenDistributionFactorLowerBound();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index fa588cbd52..70c9d39cf4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -40,7 +40,6 @@ import com.google.common.collect.Lists;
import java.math.BigDecimal;
import java.sql.Date;
-import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -104,7 +103,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
};
@Test
- public void testSampleDataFromColumnSuccess() throws SQLException {
+ public void testSampleDataFromColumnSuccess() throws Exception {
JdbcDialect dialect = new OracleDialect();
JdbcSourceTable table =
JdbcSourceTable.builder()
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
index 4efa5bf651..8fff364c3f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
@@ -288,7 +288,7 @@ public class JdbcIrisIT extends AbstractJdbcIT {
};
@Test
- public void testSampleDataFromColumnSuccess() throws SQLException {
+ public void testSampleDataFromColumnSuccess() throws Exception {
JdbcDialect dialect = new IrisDialect();
JdbcSourceTable table =
JdbcSourceTable.builder()
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
index cf28dd3a78..d7df3e87c6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
@@ -468,7 +468,7 @@ public class JdbcMysqlSplitIT extends TestSuiteBase
implements TestResource {
private JdbcSourceSplit[] getCheckedSplitArray(
Map<String, Object> configMap, CatalogTable table, String
splitKey, int splitNum)
- throws SQLException {
+ throws Exception {
configMap.put("partition_column", splitKey);
DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);