This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ee3b7c3723 [Improve][JDBC Source] Fix Split can not be cancel (#6825)
ee3b7c3723 is described below

commit ee3b7c37239f6f696fe6bb8f43bd7078465ff837
Author: Eric <[email protected]>
AuthorDate: Tue May 14 10:19:11 2024 +0800

    [Improve][JDBC Source] Fix Split can not be cancel (#6825)
---
 .../enumerator/splitter/AbstractJdbcSourceChunkSplitter.java      | 2 +-
 .../base/source/enumerator/splitter/JdbcSourceChunkSplitter.java  | 4 ++--
 .../src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java    | 2 +-
 .../enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java  | 2 +-
 .../seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java  | 2 +-
 .../connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java          | 5 ++++-
 .../cdc/oracle/source/eumerator/OracleChunkSplitter.java          | 2 +-
 .../connectors/seatunnel/cdc/oracle/utils/OracleUtils.java        | 5 ++++-
 .../cdc/postgres/source/enumerator/PostgresChunkSplitter.java     | 4 ++--
 .../connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java    | 5 ++++-
 .../sqlserver/source/source/eumerator/SqlServerChunkSplitter.java | 2 +-
 .../seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java      | 5 ++++-
 .../connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java   | 5 ++++-
 .../seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java       | 5 ++++-
 .../seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java | 4 ++--
 .../connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java    | 8 ++++----
 .../apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java  | 3 +--
 .../apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java    | 2 +-
 .../seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java     | 2 +-
 19 files changed, 43 insertions(+), 26 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index e10c70795b..60a208de86 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -114,7 +114,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
     }
 
     private List<ChunkRange> splitTableIntoChunks(
-            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
+            JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
Exception {
         final String splitColumnName = splitColumn.name();
         final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
         final Object min = minMax[0];
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
index 3981ddfa7c..d64469d3b3 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -81,7 +81,7 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
     @Deprecated
     Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
samplingRate)
-            throws SQLException;
+            throws Exception;
 
     /**
      * Performs a sampling operation on the specified column of a table in a 
JDBC-connected
@@ -97,7 +97,7 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
      */
     default Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, Column column, int 
samplingRate)
-            throws SQLException {
+            throws Exception {
         return sampleDataFromColumn(jdbc, tableId, column.name(), 
samplingRate);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
index 32617fe18c..d7bf573cc1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
@@ -83,7 +83,7 @@ public class JdbcSourceChunkSplitterTest {
         @Override
         public Object[] sampleDataFromColumn(
                 JdbcConnection jdbc, TableId tableId, String columnName, int 
samplingRate)
-                throws SQLException {
+                throws Exception {
             return new Object[0];
         }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
index 6f646eb6be..f89e720c43 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
@@ -195,7 +195,7 @@ public class AbstractJdbcSourceChunkSplitterTest {
         @Override
         public Object[] sampleDataFromColumn(
                 JdbcConnection jdbc, TableId tableId, String columnName, int 
samplingRate)
-                throws SQLException {
+                throws Exception {
             return new Object[0];
         }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index b4982f2cbe..732b21e395 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -57,7 +57,7 @@ public class MySqlChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
     @Override
     public Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, 
inverseSamplingRate);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 9b06ddda96..777be9d1d6 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -146,7 +146,7 @@ public class MySqlUtils {
 
     public static Object[] skipReadAndSortSampleData(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         final String sampleQuery =
                 String.format("SELECT %s FROM %s", quote(columnName), 
quote(tableId));
 
@@ -172,6 +172,9 @@ public class MySqlUtils {
                 if (count % inverseSamplingRate == 0) {
                     results.add(rs.getObject(1));
                 }
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new InterruptedException("Thread interrupted");
+                }
             }
         } finally {
             if (rs != null) {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
index 52df70cbc8..6525c3a2db 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
@@ -61,7 +61,7 @@ public class OracleChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
     @Override
     public Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         return OracleUtils.skipReadAndSortSampleData(
                 jdbc, tableId, columnName, inverseSamplingRate);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
index 8d67c0f141..cad2a3c836 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
@@ -150,7 +150,7 @@ public class OracleUtils {
 
     public static Object[] skipReadAndSortSampleData(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         final String sampleQuery =
                 String.format("SELECT %s FROM %s", quote(columnName), 
quoteSchemaAndTable(tableId));
 
@@ -176,6 +176,9 @@ public class OracleUtils {
                 if (count % inverseSamplingRate == 0) {
                     results.add(rs.getObject(1));
                 }
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new InterruptedException("Thread interrupted");
+                }
             }
         } finally {
             if (rs != null) {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
index 2aab573d2e..fb1aec572d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
@@ -70,7 +70,7 @@ public class PostgresChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
     @Override
     public Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         return PostgresUtils.skipReadAndSortSampleData(
                 jdbc, tableId, columnName, null, inverseSamplingRate);
     }
@@ -78,7 +78,7 @@ public class PostgresChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
     @Override
     public Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, Column column, int 
inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         return PostgresUtils.skipReadAndSortSampleData(
                 jdbc, tableId, column.name(), column, inverseSamplingRate);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
index b5cd090453..09ea768aa2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
@@ -161,7 +161,7 @@ public class PostgresUtils {
             String columnName,
             Column column,
             int inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         columnName = quote(columnName);
         if (column != null) {
             columnName = JDBC_DIALECT.convertType(columnName, 
column.typeName());
@@ -187,6 +187,9 @@ public class PostgresUtils {
                 if (count % 100000 == 0) {
                     log.info("Processing row index: {}", count);
                 }
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new InterruptedException("Thread interrupted");
+                }
                 if (count % inverseSamplingRate == 0) {
                     results.add(rs.getObject(1));
                 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index b6698f5319..b59bb7789d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -57,7 +57,7 @@ public class SqlServerChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
     @Override
     public Object[] sampleDataFromColumn(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         return SqlServerUtils.skipReadAndSortSampleData(
                 jdbc, tableId, columnName, inverseSamplingRate);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index db1872fa64..11c2822d15 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -151,7 +151,7 @@ public class SqlServerUtils {
 
     public static Object[] skipReadAndSortSampleData(
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
-            throws SQLException {
+            throws Exception {
         final String sampleQuery =
                 String.format("SELECT %s FROM %s", quote(columnName), 
quote(tableId));
 
@@ -177,6 +177,9 @@ public class SqlServerUtils {
                 if (count % inverseSamplingRate == 0) {
                     results.add(rs.getObject(1));
                 }
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new InterruptedException("Thread interrupted");
+                }
             }
         } finally {
             if (rs != null) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index db9b90dade..da92f82109 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -312,7 +312,7 @@ public interface JdbcDialect extends Serializable {
             String columnName,
             int samplingRate,
             int fetchSize)
-            throws SQLException {
+            throws Exception {
         String sampleQuery;
         if (StringUtils.isNotBlank(table.getQuery())) {
             sampleQuery =
@@ -337,6 +337,9 @@ public interface JdbcDialect extends Serializable {
                     if (count % samplingRate == 0) {
                         results.add(rs.getObject(1));
                     }
+                    if (Thread.currentThread().isInterrupted()) {
+                        throw new InterruptedException("Thread interrupted");
+                    }
                 }
                 Object[] resultsArray = results.toArray();
                 Arrays.sort(resultsArray);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 5527417e91..03067f6d5e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -131,7 +131,7 @@ public class MysqlDialect implements JdbcDialect {
             String columnName,
             int samplingRate,
             int fetchSize)
-            throws SQLException {
+            throws Exception {
         String sampleQuery;
         if (StringUtils.isNotBlank(table.getQuery())) {
             sampleQuery =
@@ -158,6 +158,9 @@ public class MysqlDialect implements JdbcDialect {
                     if (count % samplingRate == 0) {
                         results.add(rs.getObject(1));
                     }
+                    if (Thread.currentThread().isInterrupted()) {
+                        throw new InterruptedException("Thread interrupted");
+                    }
                 }
                 Object[] resultsArray = results.toArray();
                 Arrays.sort(resultsArray);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index 198dfe47cb..f4da0a8d94 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -85,7 +85,7 @@ public abstract class ChunkSplitter implements AutoCloseable, 
Serializable {
         }
     }
 
-    public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) 
throws SQLException {
+    public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) 
throws Exception {
         log.info("Start splitting table {} into chunks...", 
table.getTablePath());
         long start = System.currentTimeMillis();
 
@@ -111,7 +111,7 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
     }
 
     protected abstract Collection<JdbcSourceSplit> createSplits(
-            JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws 
SQLException;
+            JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws 
SQLException, Exception;
 
     public PreparedStatement generateSplitStatement(JdbcSourceSplit split, 
TableSchema schema)
             throws SQLException {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index 9dc26d1ef2..d958c405df 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -62,7 +62,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
 
     @Override
     protected Collection<JdbcSourceSplit> createSplits(
-            JdbcSourceTable table, SeaTunnelRowType splitKey) throws 
SQLException {
+            JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception 
{
         return createDynamicSplits(table, splitKey);
     }
 
@@ -73,7 +73,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
     }
 
     private Collection<JdbcSourceSplit> createDynamicSplits(
-            JdbcSourceTable table, SeaTunnelRowType splitKey) throws 
SQLException {
+            JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception 
{
         String splitKeyName = splitKey.getFieldNames()[0];
         SeaTunnelDataType splitKeyType = splitKey.getFieldType(0);
         List<ChunkRange> chunks = splitTableIntoChunks(table, splitKeyName, 
splitKeyType);
@@ -105,7 +105,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
 
     private List<ChunkRange> splitTableIntoChunks(
             JdbcSourceTable table, String splitColumnName, SeaTunnelDataType 
splitColumnType)
-            throws SQLException {
+            throws Exception {
         Pair<Object, Object> minMax = queryMinMax(table, splitColumnName);
         Object min = minMax.getLeft();
         Object max = minMax.getRight();
@@ -136,7 +136,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
 
     private List<ChunkRange> evenlyColumnSplitChunks(
             JdbcSourceTable table, String splitColumnName, Object min, Object 
max, int chunkSize)
-            throws SQLException {
+            throws Exception {
         TablePath tablePath = table.getTablePath();
         double distributionFactorUpper = 
config.getSplitEvenDistributionFactorUpperBound();
         double distributionFactorLower = 
config.getSplitEvenDistributionFactorLowerBound();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index fa588cbd52..70c9d39cf4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -40,7 +40,6 @@ import com.google.common.collect.Lists;
 
 import java.math.BigDecimal;
 import java.sql.Date;
-import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -104,7 +103,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
             };
 
     @Test
-    public void testSampleDataFromColumnSuccess() throws SQLException {
+    public void testSampleDataFromColumnSuccess() throws Exception {
         JdbcDialect dialect = new OracleDialect();
         JdbcSourceTable table =
                 JdbcSourceTable.builder()
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
index 4efa5bf651..8fff364c3f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
@@ -288,7 +288,7 @@ public class JdbcIrisIT extends AbstractJdbcIT {
             };
 
     @Test
-    public void testSampleDataFromColumnSuccess() throws SQLException {
+    public void testSampleDataFromColumnSuccess() throws Exception {
         JdbcDialect dialect = new IrisDialect();
         JdbcSourceTable table =
                 JdbcSourceTable.builder()
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
index cf28dd3a78..d7df3e87c6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
@@ -468,7 +468,7 @@ public class JdbcMysqlSplitIT extends TestSuiteBase 
implements TestResource {
 
     private JdbcSourceSplit[] getCheckedSplitArray(
             Map<String, Object> configMap, CatalogTable table, String 
splitKey, int splitNum)
-            throws SQLException {
+            throws Exception {
         configMap.put("partition_column", splitKey);
         DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);
 

Reply via email to