This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4beb2b9336 [Feature][Core] Add plugin directory support for each 
connector (#9650)
4beb2b9336 is described below

commit 4beb2b9336fa3da029330a45e71cc8494cd8b1c3
Author: Jia Fan <[email protected]>
AuthorDate: Mon Aug 25 15:35:46 2025 +0800

    [Feature][Core] Add plugin directory support for each connector (#9650)
---
 .../connector-v2/connector-isolated-dependency.md  |  49 ++++++++
 docs/sidebars.js                                   |   1 +
 .../connector-v2/connector-isolated-dependency.md  |  50 ++++++++
 docs/zh/connector-v2/source-common-options.md      |   2 +-
 plugins/README.md                                  |  50 +++++++-
 .../org/apache/seatunnel/common/config/Common.java |   8 +-
 .../apache/seatunnel/common/utils/FileUtils.java   |   4 +
 .../cdc/base/source/IncrementalSource.java         |  42 +++++++
 .../debezium/format/DebeziumJsonFormatTest.java    |   6 +
 .../cdc/mongodb/MongodbIncrementalSource.java      |   5 +
 .../cdc/mysql/source/MySqlIncrementalSource.java   |   6 +
 .../source/MySqlIncrementalSourceFactory.java      |   6 +
 .../OpengaussIncrementalSourceFactory.java         |   8 ++
 .../oracle/config/OracleSourceConfigFactory.java   |   4 +-
 .../cdc/oracle/source/OracleIncrementalSource.java |   6 +
 .../source/OracleIncrementalSourceFactory.java     |   8 ++
 .../postgres/source/PostgresIncrementalSource.java |   6 +
 .../source/PostgresIncrementalSourceFactory.java   |   8 ++
 .../source/SqlServerIncrementalSource.java         |   6 +
 .../source/SqlServerIncrementalSourceFactory.java  |  11 ++
 .../seatunnel/cdc/tidb/source/TiDBSource.java      |  21 ++++
 .../cdc/tidb/source/TiDBSourceFactory.java         |   8 ++
 .../seatunnel/connectors/doris/sink/DorisSink.java |  31 ++++-
 .../doris/source/DorisSourceFactory.java           |   6 +
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |  39 +++++-
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |   7 +-
 .../seatunnel/jdbc/source/JdbcSource.java          |  36 ++++++
 .../seatunnel/starrocks/sink/StarRocksSink.java    |  21 ++++
 .../starter/flink/execution/FlinkExecution.java    |  24 +++-
 .../flink/execution/SourceExecuteProcessor.java    |   3 +-
 .../seatunnel/core/starter/spark/SparkStarter.java |   6 +-
 .../seatunnel/core/starter/spark/SparkStarter.java |   6 +-
 .../spark/execution/SourceExecuteProcessor.java    |   3 +-
 .../starter/spark/execution/SparkExecution.java    |  14 +++
 .../engine/core/job/AbstractJobEnvironment.java    |  20 +--
 .../engine/core/parse/ConnectorInstanceLoader.java |  97 ---------------
 .../core/parse/MultipleTableJobConfigParser.java   |  22 +++-
 .../seatunnel/engine/server/SeaTunnelServer.java   |  13 ++
 .../plugin/discovery/AbstractPluginDiscovery.java  |  92 +++++++++++---
 .../plugin/discovery/PluginDiscovery.java          |   7 ++
 .../SeaTunnelSourcePluginDiscoveryTest.java        | 137 ++++++++++++++++++++-
 .../duplicate/connectors/plugin-mapping.properties |   2 +
 .../translation/source/CoordinatedSource.java      |  14 +++
 .../translation/source/ParallelSource.java         |  14 +++
 .../translation/flink/sink/FlinkSink.java          |  13 ++
 .../translation/flink/source/FlinkSource.java      |  25 ++++
 .../spark/sink/writer/SparkDataWriterFactory.java  |  13 ++
 .../spark/sink/SeaTunnelBatchWrite.java            |  13 ++
 .../write/SeaTunnelSparkDataWriterFactory.java     |  13 ++
 49 files changed, 850 insertions(+), 156 deletions(-)

diff --git a/docs/en/connector-v2/connector-isolated-dependency.md 
b/docs/en/connector-v2/connector-isolated-dependency.md
new file mode 100644
index 0000000000..dfb05e79ef
--- /dev/null
+++ b/docs/en/connector-v2/connector-isolated-dependency.md
@@ -0,0 +1,49 @@
+# Connector Isolated Dependency Loading Mechanism
+
+SeaTunnel provides an isolated dependency loading mechanism for each 
connector, making it easier for users to manage individual dependencies for 
different connectors, while avoiding dependency conflicts and improving system 
extensibility.
+When loading a connector, SeaTunnel will search for and load the connector's 
own dependency jars from the `${SEATUNNEL_HOME}/plugins/connector-xxx` 
directory. This ensures that the dependencies required by different connectors 
do not interfere with each other, which is helpful for managing a large number 
of connectors in complex environments.
+
+## Principle
+
+Each connector needs to place its own dependency jars in a dedicated 
subdirectory under `${SEATUNNEL_HOME}/plugins/connector-xxx` (manual creation 
required).
+The subdirectory name is specified by the value in the `plugin-mapping` file. 
When SeaTunnel starts and loads connectors, it will only load jars from the 
corresponding directory, thus achieving dependency isolation.
+
+Currently, the Zeta engine ensures that jars for different connectors in the 
same job are loaded separately. The other two engines still load all connector 
dependency jars together, so placing different versions of jars for the same 
job in Spark/Flink environments may cause dependency conflicts.
+
+## Directory Structure Example
+
+- Use `${SEATUNNEL_HOME}/connectors/plugin-mapping.properties` to get the 
folder name for each connector.
+
+For example, for AmazonDynamodb, suppose the following configuration exists in 
the `plugin-mapping` file:
+```
+seatunnel.source.AmazonDynamodb = connector-amazondynamodb
+```
+
+The corresponding connector dependency directory is the value 
`connector-amazondynamodb`.
+
+The final directory structure is as follows:
+
+```
+SEATUNNEL_HOME/
+  plugins/
+    connector-amazondynamodb/
+      dependency1.jar
+      dependency2.jar
+    connector-xxx/
+      dependencyA.jar
+      dependencyB.jar
+```
+
+## Limitations
+
+- For the Zeta engine, please ensure that the 
`${SEATUNNEL_HOME}/plugins/connector-xxx` directory structure is consistent 
across all nodes. Each node must contain the same subdirectories and dependency 
jars.
+- Any directory or jar that does not start with `connector-` will be treated 
as a common dependency directory, and all engines and connectors will load such 
jars.
+- In the Zeta engine, you can achieve shared dependencies for all connectors 
by placing common jars in the `${SEATUNNEL_HOME}/lib/` directory.
+
+## Verification
+
+- By checking the job logs, you can confirm that each connector only loads its 
own dependency jars.
+
+    ```log
+    2025-08-13T17:55:48.7732601Z [] 2025-08-13 17:55:47,270 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - find connector 
jar and dependency for PluginIdentifier{engineType='seatunnel', 
pluginType='source', pluginName='Jdbc'}: 
[file:/tmp/seatunnel/plugins/Jdbc/lib/vertica-jdbc-12.0.3-0.jar, 
file:/tmp/seatunnel/connectors/connector-jdbc-2.3.12-SNAPSHOT-2.12.15.jar]
+    ```
diff --git a/docs/sidebars.js b/docs/sidebars.js
index 5f1a2a28f2..cfd589ec11 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -157,6 +157,7 @@ const sidebars = {
                 },
                 "connector-v2/source-common-options",
                 "connector-v2/sink-common-options",
+                "connector-v2/connector-isolated-dependency",
                 "connector-v2/Error-Quick-Reference-Manual",
                 "connector-v2/Config-Encryption-Decryption"
             ]
diff --git a/docs/zh/connector-v2/connector-isolated-dependency.md 
b/docs/zh/connector-v2/connector-isolated-dependency.md
new file mode 100644
index 0000000000..2e33fcf073
--- /dev/null
+++ b/docs/zh/connector-v2/connector-isolated-dependency.md
@@ -0,0 +1,50 @@
+# Connector 依赖隔离加载机制
+
+SeaTunnel 提供了针对每个 connector 的依赖隔离加载机制,方便用户管理不同连接器单独的依赖,同时避免依赖冲突并提升系统的可扩展性。
+当加载 connector 时,SeaTunnel 会从 `${SEATUNNEL_HOME}` 下的 `plugins/connector-xxx` 
目录中,查找并加载该 connector 独立的依赖 jar。这种方式确保了不同 connector 所需的依赖不会相互影响,便于在复杂环境下管理大量 
connector。
+
+## 实现原理
+
+每个 connector 需要将自己的依赖 jar 放置在 `${SEATUNNEL_HOME}/plugins/connector-xxx` 
目录下的独立子目录中(需要手动创建)。
+子目录名称由 `plugin-mapping` 文件中的 value 值指定。SeaTunnel 启动并加载 connector 时,只会加载对应目录下的 
jar,从而实现依赖的隔离。
+
+目前,Zeta 引擎会保证同一个任务不同connector的jar分开加载。其他两个引擎仍然会将所有 connector 的依赖 jar 
一起加载,同一个任务放置了不同版本的jar在Spark/Flink环境可能导致依赖冲突。
+
+## 目录结构示例
+
+- 通过`${SEATUNNEL_HOME}/connectors/plugin-mapping.properties` 
获取每个connector对应的文件夹目录命名。
+
+以AmazonDynamodb为例,假设在 `plugin-mapping` 文件中有以下配置:
+```
+seatunnel.source.AmazonDynamodb = connector-amazondynamodb
+```
+
+则对应的connector依赖目录就是value值 `connector-amazondynamodb`。
+
+最终的目录结构如下所示:
+
+```
+SEATUNNEL_HOME/
+  plugins/
+    connector-amazondynamodb/
+      dependency1.jar
+      dependency2.jar
+    connector-xxx/
+      dependencyA.jar
+      dependencyB.jar
+```
+
+## 限制说明
+
+- 在Zeta引擎中,请确保所有节点的 `${SEATUNNEL_HOME}/plugins/` 目录结构一致。都需要包含相同的子目录和依赖 jar。
+- 任何没有以`connector-`开头的目录或者jar都将被当作通用依赖目录处理,所有引擎和connector都会加载此类jar。
+- 在Zeta引擎中,可以通过将通用的jar放到 `${SEATUNNEL_HOME}/lib/` 目录下来实现所有 connector 的共享依赖。
+
+## 验证
+
+- 通过追踪任务日志,确认每个 connector 只加载了其独立的依赖 jar。
+
+    ```log
+    2025-08-13T17:55:48.7732601Z [] 2025-08-13 17:55:47,270 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - find connector 
jar and dependency for PluginIdentifier{engineType='seatunnel', 
pluginType='source', pluginName='Jdbc'}: 
[file:/tmp/seatunnel/plugins/Jdbc/lib/vertica-jdbc-12.0.3-0.jar, 
file:/tmp/seatunnel/connectors/connector-jdbc-2.3.12-SNAPSHOT-2.12.15.jar]
+    ```
+
diff --git a/docs/zh/connector-v2/source-common-options.md 
b/docs/zh/connector-v2/source-common-options.md
index 079acd60f8..4189c07591 100644
--- a/docs/zh/connector-v2/source-common-options.md
+++ b/docs/zh/connector-v2/source-common-options.md
@@ -2,7 +2,7 @@
 sidebar_position: 3
 ---
 
-# Source Common Options
+# Source 常用选项
 
 > Source connector 的常用参数
 
diff --git a/plugins/README.md b/plugins/README.md
index 0d58dfba49..dfb05e79ef 100644
--- a/plugins/README.md
+++ b/plugins/README.md
@@ -1,11 +1,49 @@
-# Introduction of plugins directory
+# Connector Isolated Dependency Loading Mechanism
 
-This directory used to store some third party jar package dependency by 
connector running, such as jdbc drivers.
+SeaTunnel provides an isolated dependency loading mechanism for each 
connector, making it easier for users to manage individual dependencies for 
different connectors, while avoiding dependency conflicts and improving system 
extensibility.
+When loading a connector, SeaTunnel will search for and load the connector's 
own dependency jars from the `${SEATUNNEL_HOME}/plugins/connector-xxx` 
directory. This ensures that the dependencies required by different connectors 
do not interfere with each other, which is helpful for managing a large number 
of connectors in complex environments.
 
-!!!Attention: If you use Zeta Engine, please add jar to `$SEATUNNEL_HOME/lib/` 
directory on each node.
+## Principle
 
-## directory structure
+Each connector needs to place its own dependency jars in a dedicated 
subdirectory under `${SEATUNNEL_HOME}/plugins/connector-xxx` (manual creation 
required).
+The subdirectory name is specified by the value in the `plugin-mapping` file. 
When SeaTunnel starts and loads connectors, it will only load jars from the 
corresponding directory, thus achieving dependency isolation.
 
-The jar dependency  by connector need put in `plugins/${connector name}/lib/` 
dir.
+Currently, the Zeta engine ensures that jars for different connectors in the 
same job are loaded separately. The other two engines still load all connector 
dependency jars together, so placing different versions of jars for the same 
job in Spark/Flink environments may cause dependency conflicts.
 
-For example jdbc driver jars need put in 
`${seatunnel_install_home}/plugins/jdbc/lib/`
\ No newline at end of file
+## Directory Structure Example
+
+- Use `${SEATUNNEL_HOME}/connectors/plugin-mapping.properties` to get the 
folder name for each connector.
+
+For example, for AmazonDynamodb, suppose the following configuration exists in 
the `plugin-mapping` file:
+```
+seatunnel.source.AmazonDynamodb = connector-amazondynamodb
+```
+
+The corresponding connector dependency directory is the value 
`connector-amazondynamodb`.
+
+The final directory structure is as follows:
+
+```
+SEATUNNEL_HOME/
+  plugins/
+    connector-amazondynamodb/
+      dependency1.jar
+      dependency2.jar
+    connector-xxx/
+      dependencyA.jar
+      dependencyB.jar
+```
+
+## Limitations
+
+- For the Zeta engine, please ensure that the 
`${SEATUNNEL_HOME}/plugins/connector-xxx` directory structure is consistent 
across all nodes. Each node must contain the same subdirectories and dependency 
jars.
+- Any directory or jar that does not start with `connector-` will be treated 
as a common dependency directory, and all engines and connectors will load such 
jars.
+- In the Zeta engine, you can achieve shared dependencies for all connectors 
by placing common jars in the `${SEATUNNEL_HOME}/lib/` directory.
+
+## Verification
+
+- By checking the job logs, you can confirm that each connector only loads its 
own dependency jars.
+
+    ```log
+    2025-08-13T17:55:48.7732601Z [] 2025-08-13 17:55:47,270 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - find connector 
jar and dependency for PluginIdentifier{engineType='seatunnel', 
pluginType='source', pluginName='Jdbc'}: 
[file:/tmp/seatunnel/plugins/Jdbc/lib/vertica-jdbc-12.0.3-0.jar, 
file:/tmp/seatunnel/connectors/connector-jdbc-2.3.12-SNAPSHOT-2.12.15.jar]
+    ```
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index df7fe5cef6..e22fccd1fb 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -173,7 +173,7 @@ public class Common {
     }
 
     /** return plugin's dependent jars, which located in 
'plugins/${pluginName}/lib/*'. */
-    public static List<Path> getPluginsJarDependencies() {
+    public static List<Path> 
getPluginsJarDependenciesWithoutConnectorDependency() {
         Path pluginRootDir = Common.pluginRootDir();
         if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) 
{
             return Collections.emptyList();
@@ -183,6 +183,12 @@ public class Common {
                             it ->
                                     pluginRootDir.relativize(it).getNameCount()
                                             == PLUGIN_LIB_DIR_DEPTH)
+                    .filter(
+                            it ->
+                                    !it.getParent()
+                                            .getParent()
+                                            
.getName(it.getParent().getParent().getNameCount() - 1)
+                                            .startsWith("connector-"))
                     .filter(it -> it.getParent().endsWith("lib"))
                     .filter(it -> it.getFileName().toString().endsWith(".jar"))
                     .collect(Collectors.toList());
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
index 90ff8e5d49..b228d1be1a 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -35,6 +35,7 @@ import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
@@ -45,6 +46,9 @@ import java.util.stream.Stream;
 public class FileUtils {
 
     public static List<URL> searchJarFiles(@NonNull Path directory) throws 
IOException {
+        if (!directory.toFile().exists()) {
+            return new ArrayList<>();
+        }
         try (Stream<Path> paths = Files.walk(directory, 
FileVisitOption.FOLLOW_LINKS)) {
             return paths.filter(path -> path.toString().endsWith(".jar"))
                     .map(
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 85340c4a3a..415aa32467 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -66,7 +66,9 @@ import 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJs
 
 import io.debezium.relational.TableId;
 import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -74,6 +76,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -82,9 +85,22 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 @NoArgsConstructor
+@Slf4j
 public abstract class IncrementalSource<T, C extends SourceConfig>
         implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     protected ReadonlyConfig readonlyConfig;
     protected SourceConfig.Factory<C> configFactory;
     protected OffsetFactory offsetFactory;
@@ -191,6 +207,8 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
 
     public abstract OffsetFactory createOffsetFactory(ReadonlyConfig config);
 
+    public abstract Optional<String> driverName();
+
     @Override
     public Boundedness getBoundedness() {
         return stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED : 
Boundedness.BOUNDED;
@@ -200,6 +218,14 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
     @Override
     public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context 
readerContext)
             throws Exception {
+        // Load the JDBC driver in to DriverManager
+        if (driverName().isPresent()) {
+            try {
+                Class.forName(driverName().get());
+            } catch (Exception e) {
+                log.warn("Failed to load JDBC driver: {}", driverName().get(), 
e);
+            }
+        }
         // create source config for the given subtask (e.g. unique server id)
         C sourceConfig = 
configFactory.create(readerContext.getIndexOfSubtask());
         BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
@@ -232,6 +258,14 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
     @Override
     public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> 
createEnumerator(
             SourceSplitEnumerator.Context<SourceSplitBase> enumeratorContext) 
throws Exception {
+        // Load the JDBC driver in to DriverManager
+        if (driverName().isPresent()) {
+            try {
+                Class.forName(driverName().get());
+            } catch (Exception e) {
+                log.warn("Failed to load JDBC driver: {}", driverName().get(), 
e);
+            }
+        }
         C sourceConfig = configFactory.create(0);
         final List<TableId> remainingTables =
                 dataSourceDialect.discoverDataCollections(sourceConfig);
@@ -273,6 +307,14 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
             SourceSplitEnumerator.Context<SourceSplitBase> enumeratorContext,
             PendingSplitsState checkpointState)
             throws Exception {
+        // Load the JDBC driver in to DriverManager
+        if (driverName().isPresent()) {
+            try {
+                Class.forName(driverName().get());
+            } catch (Exception e) {
+                log.warn("Failed to load JDBC driver: {}", driverName().get(), 
e);
+            }
+        }
         C sourceConfig = configFactory.create(0);
         Set<TableId> capturedTables =
                 new 
HashSet<>(dataSourceDialect.discoverDataCollections(sourceConfig));
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
index adb5b7389d..0a438180a4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 class DebeziumJsonFormatTest {
 
@@ -105,6 +106,11 @@ class DebeziumJsonFormatTest {
         public String getPluginName() {
             return "";
         }
+
+        @Override
+        public Optional<String> driverName() {
+            return Optional.empty();
+        }
     }
 
     @Test
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
index 996eda76e8..a10f6ad15e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
@@ -136,4 +136,9 @@ public class MongodbIncrementalSource<T> extends 
IncrementalSource<T, MongodbSou
             SourceConfig sourceConfig, SourceReader.Context context) {
         return new MongoDBRecordEmitter<>(deserializationSchema, 
offsetFactory, context);
     }
+
+    @Override
+    public Optional<String> driverName() {
+        return Optional.empty();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 50d745cacd..d75fcbe981 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -52,6 +52,7 @@ import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -163,4 +164,9 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
                 SchemaChangeType.RENAME_COLUMN,
                 SchemaChangeType.UPDATE_COLUMN);
     }
+
+    @Override
+    public Optional<String> driverName() {
+        return Optional.of("com.mysql.cj.jdbc.Driver");
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index a37b2bdc0c..13f39beeec 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -103,6 +103,12 @@ public class MySqlIncrementalSourceFactory extends 
BaseChangeStreamTableSourceFa
             TableSource<T, SplitT, StateT> restoreSource(
                     TableSourceFactoryContext context, List<CatalogTable> 
restoreTables) {
         return () -> {
+            // Load the JDBC driver in to DriverManager
+            try {
+                Class.forName("com.mysql.cj.jdbc.Driver");
+            } catch (Exception e) {
+                log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver 
", e);
+            }
             ReadonlyConfig config = context.getOptions();
             List<CatalogTable> catalogTables =
                     CatalogTableUtil.getCatalogTables(config, 
context.getClassLoader());
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
index bb52f01242..5418162ab0 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
@@ -38,12 +38,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresSou
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
 
 @AutoService(Factory.class)
+@Slf4j
 public class OpengaussIncrementalSourceFactory implements TableSourceFactory {
     private static final String IDENTIFIER = "Opengauss-CDC";
 
@@ -89,6 +91,12 @@ public class OpengaussIncrementalSourceFactory implements 
TableSourceFactory {
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
+            // Load the JDBC driver in to DriverManager
+            try {
+                Class.forName("org.postgresql.Driver");
+            } catch (Exception e) {
+                log.warn("Failed to load JDBC driver org.postgresql.Driver", 
e);
+            }
             List<CatalogTable> catalogTables =
                     CatalogTableUtil.getCatalogTables(
                             "Postgres", context.getOptions(), 
context.getClassLoader());
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
index fa240bc158..1dda36a4be 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
@@ -73,8 +73,8 @@ public class OracleSourceConfigFactory extends 
JdbcSourceConfigFactory {
 
         try {
             Class.forName(DRIVER_CLASS_NAME);
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver {}", DRIVER_CLASS_NAME, e);
         }
 
         Properties props = new Properties();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
index 2b59e67209..28782f6098 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
@@ -52,6 +52,7 @@ import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -126,6 +127,11 @@ public class OracleIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceC
                 (OracleSourceConfigFactory) configFactory, (OracleDialect) 
dataSourceDialect);
     }
 
+    @Override
+    public Optional<String> driverName() {
+        return Optional.of("oracle.jdbc.OracleDriver");
+    }
+
     private Map<TableId, Struct> tableChanges() {
         JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
         OracleDialect dialect =
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
index 5a22cf09e6..7b32421a7c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
@@ -38,12 +38,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceC
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
 
 @AutoService(Factory.class)
+@Slf4j
 public class OracleIncrementalSourceFactory extends 
BaseChangeStreamTableSourceFactory {
     @Override
     public String factoryIdentifier() {
@@ -105,6 +107,12 @@ public class OracleIncrementalSourceFactory extends 
BaseChangeStreamTableSourceF
             TableSource<T, SplitT, StateT> restoreSource(
                     TableSourceFactoryContext context, List<CatalogTable> 
restoreTables) {
         return () -> {
+            // Load the JDBC driver in to DriverManager
+            try {
+                Class.forName("oracle.jdbc.OracleDriver");
+            } catch (Exception e) {
+                log.warn("Failed to load JDBC driver {}", 
"oracle.jdbc.OracleDriver", e);
+            }
             List<CatalogTable> catalogTables =
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
index ed8ccb254b..a9e10e9516 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
@@ -50,6 +50,7 @@ import io.debezium.util.SchemaNameAdjuster;
 import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -123,6 +124,11 @@ public class PostgresIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourc
                 (PostgresSourceConfigFactory) configFactory, (PostgresDialect) 
dataSourceDialect);
     }
 
+    @Override
+    public Optional<String> driverName() {
+        return Optional.of("org.postgresql.Driver");
+    }
+
     private Map<TableId, Struct> tableChanges() {
         JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
         PostgresDialect dialect =
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSourceFactory.java
index 3a248ee37e..91e8f3c881 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSourceFactory.java
@@ -36,12 +36,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.option.PostgresOpt
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
 
 @AutoService(Factory.class)
+@Slf4j
 public class PostgresIncrementalSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
@@ -85,6 +87,12 @@ public class PostgresIncrementalSourceFactory implements 
TableSourceFactory {
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
+            // Load the JDBC driver in to DriverManager
+            try {
+                Class.forName("org.postgresql.Driver");
+            } catch (Exception e) {
+                log.warn("Failed to load JDBC driver {}", 
"org.postgresql.Driver", e);
+            }
             List<CatalogTable> catalogTables =
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSource.java
index 0b82a254c3..0327b89b06 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSource.java
@@ -41,6 +41,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
 
 import java.time.ZoneId;
 import java.util.List;
+import java.util.Optional;
 
 public class SqlServerIncrementalSource<T> extends IncrementalSource<T, 
JdbcSourceConfig>
         implements SupportParallelism {
@@ -108,4 +109,9 @@ public class SqlServerIncrementalSource<T> extends 
IncrementalSource<T, JdbcSour
         return new LsnOffsetFactory(
                 (SqlServerSourceConfigFactory) configFactory, 
(SqlServerDialect) dataSourceDialect);
     }
+
+    @Override
+    public Optional<String> driverName() {
+        return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceFactory.java
index ca124c25e5..96b661f240 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceFactory.java
@@ -37,12 +37,14 @@ import 
org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
 
 @AutoService(Factory.class)
+@Slf4j
 public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
 
     @Override
@@ -101,6 +103,15 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
+            // Load the JDBC driver in to DriverManager
+            try {
+                Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+            } catch (Exception e) {
+                log.warn(
+                        "Failed to load JDBC driver {}",
+                        "com.microsoft.sqlserver.jdbc.SQLServerDriver",
+                        e);
+            }
             List<CatalogTable> catalogTables =
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
index fef7adab34..5827b1c72d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
@@ -33,9 +33,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDB
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.Collections;
 import java.util.List;
 
+@Slf4j
 public class TiDBSource
         implements SeaTunnelSource<SeaTunnelRow, TiDBSourceSplit, 
TiDBSourceCheckpointState>,
                 SupportParallelism,
@@ -91,6 +94,12 @@ public class TiDBSource
     @Override
     public SourceReader<SeaTunnelRow, TiDBSourceSplit> 
createReader(SourceReader.Context context)
             throws Exception {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
+        }
         return new TiDBSourceReader(context, config, catalogTable);
     }
 
@@ -105,6 +114,12 @@ public class TiDBSource
     @Override
     public SourceSplitEnumerator<TiDBSourceSplit, TiDBSourceCheckpointState> 
createEnumerator(
             SourceSplitEnumerator.Context<TiDBSourceSplit> context) throws 
Exception {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
+        }
         return new TiDBSourceSplitEnumerator(context, config);
     }
 
@@ -122,6 +137,12 @@ public class TiDBSource
             SourceSplitEnumerator.Context<TiDBSourceSplit> context,
             TiDBSourceCheckpointState checkpointState)
             throws Exception {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
+        }
         return new TiDBSourceSplitEnumerator(context, config, checkpointState);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
index 39d8b03739..254bc584c9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
@@ -32,10 +32,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 
 @AutoService(Factory.class)
+@Slf4j
 public class TiDBSourceFactory implements TableSourceFactory {
     /**
      * Returns a unique identifier among same factory interfaces.
@@ -87,6 +89,12 @@ public class TiDBSourceFactory implements TableSourceFactory 
{
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
+            // Load the JDBC driver in to DriverManager
+            try {
+                Class.forName("com.mysql.cj.jdbc.Driver");
+            } catch (Exception e) {
+                log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver 
", e);
+            }
             ReadonlyConfig config = context.getOptions();
             TiDBCatalogFactory catalogFactory = new TiDBCatalogFactory();
             // Build tidb catalog.
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 66b19e3971..88bee59012 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -45,6 +45,8 @@ import 
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
 import 
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkStateSerializer;
 import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -53,6 +55,7 @@ import java.util.Optional;
 
 import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
 
+@Slf4j
 public class DorisSink
         implements SeaTunnelSink<SeaTunnelRow, DorisSinkState, 
DorisCommitInfo, DorisCommitInfo>,
                 SupportSaveMode,
@@ -68,6 +71,12 @@ public class DorisSink
         this.config = config;
         this.catalogTable = catalogTable;
         this.dorisSinkConfig = DorisSinkConfig.of(config);
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
+        }
     }
 
     @Override
@@ -82,6 +91,12 @@ public class DorisSink
 
     @Override
     public DorisSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
+        }
         return new DorisSinkWriter(
                 context, Collections.emptyList(), catalogTable, 
dorisSinkConfig, jobId);
     }
@@ -89,6 +104,12 @@ public class DorisSink
     @Override
     public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> 
restoreWriter(
             SinkWriter.Context context, List<DorisSinkState> states) throws 
IOException {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
+        }
         return new DorisSinkWriter(context, states, catalogTable, 
dorisSinkConfig, jobId);
     }
 
@@ -99,6 +120,12 @@ public class DorisSink
 
     @Override
     public Optional<SinkCommitter<DorisCommitInfo>> createCommitter() throws 
IOException {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
+        }
         return Optional.of(new DorisCommitter(dorisSinkConfig));
     }
 
@@ -112,8 +139,8 @@ public class DorisSink
         // Load the JDBC driver in to DriverManager
         try {
             Class.forName("com.mysql.cj.jdbc.Driver");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
         }
         CatalogFactory catalogFactory =
                 discoverFactory(
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
index 1bdbd86883..d025eaa061 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
@@ -80,6 +80,12 @@ public class DorisSourceFactory implements 
TableSourceFactory {
     @Override
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ", 
e);
+        }
         DorisSourceConfig dorisSourceConfig = 
DorisSourceConfig.of(context.getOptions());
         List<DorisTableConfig> dorisTableConfigList = 
dorisSourceConfig.getTableConfigList();
         Map<TablePath, DorisSourceTable> dorisSourceTables = new HashMap<>();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 62b47a3f97..ee61b27557 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -93,6 +93,15 @@ public class JdbcSink
             SchemaSaveMode schemaSaveMode,
             DataSaveMode dataSaveMode,
             CatalogTable catalogTable) {
+        // Load the JDBC driver in to DriverManager
+        try {
+            
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (Exception e) {
+            log.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
+        }
         this.config = config;
         this.jdbcSinkConfig = jdbcSinkConfig;
         this.dialect = dialect;
@@ -111,8 +120,11 @@ public class JdbcSink
     public AbstractJdbcSinkWriter createWriter(SinkWriter.Context context) {
         try {
             
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            log.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
         }
         TablePath sinkTablePath = catalogTable.getTablePath();
         AbstractJdbcSinkWriter sinkWriter;
@@ -158,8 +170,11 @@ public class JdbcSink
             SinkWriter.Context context, List<JdbcSinkState> states) throws 
IOException {
         try {
             
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            log.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
         }
         TablePath sinkTablePath = catalogTable.getTablePath();
         if (jdbcSinkConfig.isExactlyOnce()) {
@@ -204,6 +219,15 @@ public class JdbcSink
     @Override
     public Optional<SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo>>
             createAggregatedCommitter() {
+        // Load the JDBC driver in to DriverManager
+        try {
+            
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (Exception e) {
+            log.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
+        }
         if (jdbcSinkConfig.isExactlyOnce()) {
             return Optional.of(new 
JdbcSinkAggregatedCommitter(jdbcSinkConfig));
         }
@@ -235,8 +259,11 @@ public class JdbcSink
     public Optional<SaveModeHandler> getSaveModeHandler() {
         try {
             
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            log.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
         }
         if (catalogTable != null) {
             Optional<Catalog> catalogOptional = getCatalog();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 6ffa0acd51..3137f427e8 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -76,8 +76,11 @@ public class JdbcSinkWriter extends 
AbstractJdbcSinkWriter<ConnectionPoolManager
         HikariDataSource ds = new HikariDataSource();
         try {
             
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            log.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
         }
         ds.setIdleTimeout(30 * 1000);
         ds.setMaximumPoolSize(queueSize);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 8443aeb044..616842bc7f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -52,6 +52,15 @@ public class JdbcSource
 
     @SneakyThrows
     public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
+        // Load the JDBC driver in to DriverManager
+        try {
+            
Class.forName(jdbcSourceConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSourceConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
+        }
         this.jdbcSourceConfig = jdbcSourceConfig;
         this.jdbcSourceTables =
                 JdbcCatalogUtils.getTables(
@@ -79,6 +88,15 @@ public class JdbcSource
     @Override
     public SourceReader<SeaTunnelRow, JdbcSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
+        // Load the JDBC driver in to DriverManager
+        try {
+            
Class.forName(jdbcSourceConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSourceConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
+        }
         Map<TablePath, CatalogTable> tables = new HashMap<>();
         for (TablePath tablePath : jdbcSourceTables.keySet()) {
             tables.put(tablePath, 
jdbcSourceTables.get(tablePath).getCatalogTable());
@@ -94,6 +112,15 @@ public class JdbcSource
     @Override
     public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> 
createEnumerator(
             SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext) 
throws Exception {
+        // Load the JDBC driver in to DriverManager
+        try {
+            
Class.forName(jdbcSourceConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSourceConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
+        }
         return new JdbcSourceSplitEnumerator(
                 enumeratorContext, jdbcSourceConfig, jdbcSourceTables, null);
     }
@@ -103,6 +130,15 @@ public class JdbcSource
             SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext,
             JdbcSourceState checkpointState)
             throws Exception {
+        // Load the JDBC driver in to DriverManager
+        try {
+            
Class.forName(jdbcSourceConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to load JDBC driver {}",
+                    jdbcSourceConfig.getJdbcConnectionConfig().getDriverName(),
+                    e);
+        }
         return new JdbcSourceSplitEnumerator(
                 enumeratorContext, jdbcSourceConfig, jdbcSourceTables, 
checkpointState);
     }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 06fdbb97bd..1b89ef3f4e 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -37,10 +37,13 @@ import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCata
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 
+@Slf4j
 public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportSaveMode, SupportSchemaEvolutionSink, 
SupportMultiTableSink {
 
@@ -56,6 +59,12 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         this.catalogTable = catalogTable;
         this.dataSaveMode = sinkConfig.getDataSaveMode();
         this.schemaSaveMode = sinkConfig.getSchemaSaveMode();
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver {}", 
"com.mysql.cj.jdbc.Driver", e);
+        }
     }
 
     @Override
@@ -65,12 +74,24 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
 
     @Override
     public StarRocksSinkWriter createWriter(SinkWriter.Context context) {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver {}", 
"com.mysql.cj.jdbc.Driver", e);
+        }
         TablePath sinkTablePath = catalogTable.getTablePath();
         return new StarRocksSinkWriter(sinkConfig, tableSchema, sinkTablePath);
     }
 
     @Override
     public Optional<SaveModeHandler> getSaveModeHandler() {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (Exception e) {
+            log.warn("Failed to load JDBC driver {}", 
"com.mysql.cj.jdbc.Driver", e);
+        }
         TablePath tablePath =
                 TablePath.of(
                         catalogTable.getTableId().getDatabaseName(),
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 0d74c59c79..ae2c5039d4 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -45,9 +45,12 @@ import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Path;
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -56,6 +59,18 @@ import java.util.stream.Stream;
 /** Used to execute a SeaTunnelTask. */
 public class FlinkExecution implements TaskExecution {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkExecution.class);
 
     private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
@@ -102,7 +117,8 @@ public class FlinkExecution implements TaskExecution {
                         jarPaths, envConfig, 
config.getConfigList(Constants.SINK), jobContext);
 
         this.flinkRuntimeEnvironment =
-                
FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths));
+                FlinkRuntimeEnvironment.getInstance(
+                        this.registerPlugin(config, new HashSet<>(jarPaths)));
 
         
this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
         
this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
@@ -154,7 +170,7 @@ public class FlinkExecution implements TaskExecution {
                             Common.getThirdPartyJars(
                                     
envConfig.getString(EnvCommonOptions.JARS.key())));
         }
-        thirdPartyJars.addAll(Common.getPluginsJarDependencies());
+        
thirdPartyJars.addAll(Common.getPluginsJarDependenciesWithoutConnectorDependency());
         List<URL> jarDependencies =
                 Stream.concat(thirdPartyJars.stream(), 
Common.getLibJars().stream())
                         .map(Path::toUri)
@@ -173,7 +189,7 @@ public class FlinkExecution implements TaskExecution {
         jarPaths.addAll(jarDependencies);
     }
 
-    private Config registerPlugin(Config config, List<URL> jars) {
+    private Config registerPlugin(Config config, Collection<URL> jars) {
         config =
                 this.injectJarsToConfig(
                         config, ConfigUtil.joinPath("env", "pipeline", 
"jars"), jars);
@@ -181,7 +197,7 @@ public class FlinkExecution implements TaskExecution {
                 config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), 
jars);
     }
 
-    private Config injectJarsToConfig(Config config, String path, List<URL> 
jars) {
+    private Config injectJarsToConfig(Config config, String path, 
Collection<URL> jars) {
         List<URL> validJars = new ArrayList<>();
         for (URL jarUrl : jars) {
             if (new File(jarUrl.getFile()).exists()) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 29479ceba6..182592e9ca 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -117,7 +117,8 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
                             PluginType.SOURCE.getType(),
                             sourceConfig.getString(PLUGIN_NAME.key()));
             jars.addAll(
-                    
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+                    sourcePluginDiscovery.getPluginJarAndDependencyPaths(
+                            Lists.newArrayList(pluginIdentifier)));
 
             Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> source =
                     FactoryUtil.createAndPrepareSource(
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 787c153ba3..52f1a2ee99 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -111,7 +111,6 @@ public class SparkStarter implements Starter {
         setSparkConf();
         Common.setDeployMode(commandArgs.getDeployMode());
         Common.setStarter(true);
-        this.jars.addAll(Common.getPluginsJarDependencies());
         this.jars.addAll(Common.getLibJars());
         this.jars.addAll(getConnectorJarDependencies());
         this.jars.addAll(
@@ -152,13 +151,14 @@ public class SparkStarter implements Starter {
         SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery =
                 new SeaTunnelSinkPluginDiscovery();
         pluginJars.addAll(
-                seaTunnelSourcePluginDiscovery.getPluginJarPaths(
+                seaTunnelSourcePluginDiscovery.getPluginJarAndDependencyPaths(
                         getPluginIdentifiers(config, PluginType.SOURCE)));
         pluginJars.addAll(
-                seaTunnelSinkPluginDiscovery.getPluginJarPaths(
+                seaTunnelSinkPluginDiscovery.getPluginJarAndDependencyPaths(
                         getPluginIdentifiers(config, PluginType.SINK)));
         return pluginJars.stream()
                 .map(url -> new File(url.getPath()).toPath())
+                .distinct()
                 .collect(Collectors.toList());
     }
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 555aeaed8f..e734112e85 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -111,7 +111,6 @@ public class SparkStarter implements Starter {
         setSparkConf();
         Common.setDeployMode(commandArgs.getDeployMode());
         Common.setStarter(true);
-        this.jars.addAll(Common.getPluginsJarDependencies());
         this.jars.addAll(Common.getLibJars());
         this.jars.addAll(getConnectorJarDependencies());
         this.jars.addAll(
@@ -152,13 +151,14 @@ public class SparkStarter implements Starter {
         SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery =
                 new SeaTunnelSinkPluginDiscovery();
         pluginJars.addAll(
-                seaTunnelSourcePluginDiscovery.getPluginJarPaths(
+                seaTunnelSourcePluginDiscovery.getPluginJarAndDependencyPaths(
                         getPluginIdentifiers(config, PluginType.SOURCE)));
         pluginJars.addAll(
-                seaTunnelSinkPluginDiscovery.getPluginJarPaths(
+                seaTunnelSinkPluginDiscovery.getPluginJarAndDependencyPaths(
                         getPluginIdentifiers(config, PluginType.SINK)));
         return pluginJars.stream()
                 .map(url -> new File(url.getPath()).toPath())
+                .distinct()
                 .collect(Collectors.toList());
     }
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 77fdda26fc..c2e4e055c9 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -129,7 +129,8 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
                             PluginType.SOURCE.getType(),
                             sourceConfig.getString(PLUGIN_NAME.key()));
             jars.addAll(
-                    
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+                    sourcePluginDiscovery.getPluginJarAndDependencyPaths(
+                            Lists.newArrayList(pluginIdentifier)));
             Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> source =
                     FactoryUtil.createAndPrepareSource(
                             ReadonlyConfig.fromConfig(sourceConfig),
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index efe3824145..4ced4137a7 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -30,12 +30,26 @@ import 
org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 @Slf4j
 public class SparkExecution implements TaskExecution {
+
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private final SparkRuntimeEnvironment sparkRuntimeEnvironment;
     private final PluginExecuteProcessor<DatasetTableInfo, 
SparkRuntimeEnvironment>
             sourcePluginExecuteProcessor;
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
index 28c6b4f012..cd01efaf40 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
@@ -18,9 +18,7 @@
 package org.apache.seatunnel.engine.core.job;
 
 import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
@@ -31,10 +29,8 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -70,10 +66,18 @@ public abstract class AbstractJobEnvironment {
 
     protected Set<URL> searchPluginJars() {
         try {
-            if (Files.exists(Common.pluginRootDir())) {
-                return new 
HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
-            }
-        } catch (IOException | SeaTunnelEngineException e) {
+            return new HashSet<>(
+                    
Common.getPluginsJarDependenciesWithoutConnectorDependency().stream()
+                            .map(
+                                    p -> {
+                                        try {
+                                            return p.toUri().toURL();
+                                        } catch (MalformedURLException e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                    })
+                            .collect(Collectors.toList()));
+        } catch (Exception e) {
             LOGGER.warning(
                     String.format("Can't search plugin jars in %s.", 
Common.pluginRootDir()), e);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
deleted file mode 100644
index 87b1a4e7cd..0000000000
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.parse;
-
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PluginIdentifier;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.constants.CollectionConstants;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-
-import scala.Serializable;
-
-import java.net.URL;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class ConnectorInstanceLoader {
-    private ConnectorInstanceLoader() {
-        throw new IllegalStateException("Utility class");
-    }
-
-    public static ImmutablePair<SeaTunnelSource, Set<URL>> loadSourceInstance(
-            Config sourceConfig, JobContext jobContext, List<URL> pluginJars) {
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new 
SeaTunnelSourcePluginDiscovery();
-        PluginIdentifier pluginIdentifier =
-                PluginIdentifier.of(
-                        CollectionConstants.SEATUNNEL_PLUGIN,
-                        CollectionConstants.SOURCE_PLUGIN,
-                        
sourceConfig.getString(CollectionConstants.PLUGIN_NAME));
-
-        List<URL> pluginJarPaths =
-                
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
-
-        SeaTunnelSource seaTunnelSource =
-                sourcePluginDiscovery.createPluginInstance(pluginIdentifier, 
pluginJars);
-        return new ImmutablePair<>(seaTunnelSource, new 
HashSet<>(pluginJarPaths));
-    }
-
-    public static ImmutablePair<
-                    SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>, Set<URL>>
-            loadSinkInstance(Config sinkConfig, JobContext jobContext, 
List<URL> pluginJars) {
-        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
-        PluginIdentifier pluginIdentifier =
-                PluginIdentifier.of(
-                        CollectionConstants.SEATUNNEL_PLUGIN,
-                        CollectionConstants.SINK_PLUGIN,
-                        sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
-        List<URL> pluginJarPaths =
-                
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
-        SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> 
seaTunnelSink =
-                sinkPluginDiscovery.createPluginInstance(pluginIdentifier, 
pluginJars);
-        return new ImmutablePair<>(seaTunnelSink, new 
HashSet<>(pluginJarPaths));
-    }
-
-    public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>> 
loadTransformInstance(
-            Config transformConfig, JobContext jobContext, List<URL> 
pluginJars) {
-        SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
-                new SeaTunnelTransformPluginDiscovery();
-        PluginIdentifier pluginIdentifier =
-                PluginIdentifier.of(
-                        CollectionConstants.SEATUNNEL_PLUGIN,
-                        CollectionConstants.TRANSFORM_PLUGIN,
-                        
transformConfig.getString(CollectionConstants.PLUGIN_NAME));
-
-        List<URL> pluginJarPaths =
-                
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
-        SeaTunnelTransform<?> seaTunnelTransform =
-                
transformPluginDiscovery.createPluginInstance(pluginIdentifier, pluginJars);
-        return new ImmutablePair<>(seaTunnelTransform, new 
HashSet<>(pluginJarPaths));
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 377040ff95..b5b27db163 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -78,6 +78,7 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -104,6 +105,18 @@ import static 
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputId
 @Slf4j
 public class MultipleTableJobConfigParser {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private final IdGenerator idGenerator;
     private final JobConfig jobConfig;
 
@@ -289,7 +302,8 @@ public class MultipleTableJobConfigParser {
                                                 factory))
                         .collect(Collectors.toList());
         List<URL> jarPaths = new ArrayList<>();
-        jarPaths.addAll(new 
SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds));
+        jarPaths.addAll(
+                new 
SeaTunnelSinkPluginDiscovery().getPluginJarAndDependencyPaths(factoryIds));
         jarPaths.addAll(commonPluginJars);
         return jarPaths;
     }
@@ -749,7 +763,8 @@ public class MultipleTableJobConfigParser {
                         CollectionConstants.SOURCE_PLUGIN,
                         
sourceConfig.getString(CollectionConstants.PLUGIN_NAME));
         List<URL> pluginJarPaths =
-                
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
+                sourcePluginDiscovery.getPluginJarAndDependencyPaths(
+                        Lists.newArrayList(pluginIdentifier));
         return pluginJarPaths;
     }
 
@@ -774,7 +789,8 @@ public class MultipleTableJobConfigParser {
                         CollectionConstants.SINK_PLUGIN,
                         sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
         List<URL> pluginJarPaths =
-                
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
+                sinkPluginDiscovery.getPluginJarAndDependencyPaths(
+                        Lists.newArrayList(pluginIdentifier));
         return pluginJarPaths;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 98bd6210ef..199fb85b42 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -50,6 +50,7 @@ import lombok.Getter;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
+import java.sql.DriverManager;
 import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -59,6 +60,18 @@ import java.util.concurrent.TimeUnit;
 public class SeaTunnelServer
         implements ManagedService, MembershipAwareService, 
LiveOperationsTracker {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private static final ILogger LOGGER = 
Logger.getLogger(SeaTunnelServer.class);
 
     public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 0f8ed3755a..194c080633 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -49,11 +49,14 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -124,10 +127,7 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
 
     protected static Config loadConnectorPluginConfig() {
         return 
ConfigFactory.parseFile(Common.connectorDir().resolve(PLUGIN_MAPPING_FILE).toFile())
-                
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                .resolveWith(
-                        ConfigFactory.systemProperties(),
-                        
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+                
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
     }
 
     @Override
@@ -141,6 +141,32 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
                 .collect(Collectors.toList());
     }
 
+    @Override
+    public List<URL> getPluginJarAndDependencyPaths(List<PluginIdentifier> 
pluginIdentifiers) {
+        return pluginIdentifiers.stream()
+                .flatMap(
+                        pluginIdentifier -> {
+                            try {
+                                List<URL> jars = 
getPluginDependencyJarPaths(pluginIdentifier);
+                                
getPluginJarPath(pluginIdentifier).ifPresent(jars::addAll);
+                                log.info(
+                                        "find connector jar and dependency for 
{}: {}",
+                                        pluginIdentifier,
+                                        jars);
+                                return jars.stream();
+                            } catch (IOException e) {
+                                log.warn(
+                                        "get plugin dependency jar path 
failed, pluginIdentifier: {}",
+                                        pluginIdentifier,
+                                        e);
+                                return Stream.empty();
+                            }
+                        })
+                .distinct()
+                .sorted(Comparator.comparing(URL::toString))
+                .collect(Collectors.toList());
+    }
+
     @Override
     public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
         return pluginIdentifiers.stream()
@@ -400,13 +426,7 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
      */
     protected abstract Class<T> getPluginBaseClass();
 
-    /**
-     * Find the plugin jar path;
-     *
-     * @param pluginIdentifier plugin identifier.
-     * @return plugin jar path.
-     */
-    private Optional<List<URL>> findPluginJarPath(PluginIdentifier 
pluginIdentifier) {
+    private Optional<String> getPluginMappingPrefix(PluginIdentifier 
pluginIdentifier) {
         final String engineType = 
pluginIdentifier.getEngineType().toLowerCase();
         final String pluginType = 
pluginIdentifier.getPluginType().toLowerCase();
         final String pluginName = 
pluginIdentifier.getPluginName().toLowerCase();
@@ -422,15 +442,28 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
                 typeConfig.entrySet().stream()
                         .filter(entry -> 
StringUtils.equalsIgnoreCase(entry.getKey(), pluginName))
                         .findFirst();
-        if (!optional.isPresent()) {
+        return optional.map(entry -> entry.getValue().unwrapped().toString());
+    }
+
+    /**
+     * Find the plugin jar path;
+     *
+     * @param pluginIdentifier plugin identifier.
+     * @return plugin jar path.
+     */
+    private Optional<List<URL>> findPluginJarPath(PluginIdentifier 
pluginIdentifier) {
+        Optional<String> pluginPrefix = 
getPluginMappingPrefix(pluginIdentifier);
+        if (!pluginPrefix.isPresent()) {
             return Optional.empty();
         }
-        String pluginJarPrefix = 
optional.get().getValue().unwrapped().toString();
+        final String pluginName = 
pluginIdentifier.getPluginName().toLowerCase();
+        final String pluginType = 
pluginIdentifier.getPluginType().toLowerCase();
         File[] targetPluginFiles =
                 pluginDir
                         .toFile()
                         .listFiles(
-                                pathname -> filterPluginJar(pathname, 
pluginJarPrefix, pluginName));
+                                pathname ->
+                                        filterPluginJar(pathname, 
pluginPrefix.get(), pluginName));
         if (ArrayUtils.isEmpty(targetPluginFiles)) {
             return Optional.empty();
         }
@@ -441,7 +474,8 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
                 pluginJarPaths = 
Collections.singletonList(targetPluginFiles[0].toURI().toURL());
             } else {
                 pluginJarPaths =
-                        selectPluginJar(targetPluginFiles, pluginJarPrefix, 
pluginName, type).get();
+                        selectPluginJar(targetPluginFiles, pluginPrefix.get(), 
pluginName, type)
+                                .get();
             }
         } catch (MalformedURLException e) {
             throw new RuntimeException(e);
@@ -450,6 +484,34 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
         return Optional.of(pluginJarPaths);
     }
 
+    private List<URL> getPluginDependencyJarPaths(PluginIdentifier 
pluginIdentifier)
+            throws IOException {
+        Optional<String> pluginPrefix = 
getPluginMappingPrefix(pluginIdentifier);
+        if (!pluginPrefix.isPresent()) {
+            return Collections.emptyList();
+        }
+        List<URL> jars = new ArrayList<>();
+        Path pluginRootDir = Common.pluginRootDir();
+        if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) 
{
+            return new ArrayList<>();
+        }
+        for (File file : pluginRootDir.toFile().listFiles()) {
+            // only read current connector dependency and other common 
dependency
+            if (file.isDirectory()
+                    && (!file.getName().startsWith("connector-")
+                            || 
file.getName().equalsIgnoreCase(pluginPrefix.get()))) {
+                jars.addAll(
+                        FileUtils.searchJarFiles(
+                                Paths.get(Common.pluginRootDir().toString(), 
file.getName())));
+            } else if (!file.isDirectory()) {
+                jars.add(file.toURI().toURL());
+            }
+        }
+        return jars.stream()
+                .filter(path -> path.toString().endsWith(".jar"))
+                .collect(Collectors.toList());
+    }
+
     private boolean filterPluginJar(File pathname, String pluginJarPrefix, 
String pluginName) {
         if (pluginName.contains("cdc")) {
             return pathname.getName().endsWith(".jar")
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index 0d631745eb..36eb46aadb 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -44,6 +44,13 @@ public interface PluginDiscovery<T> {
      */
     List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers);
 
+    /**
+     * Get all plugin dependency jar paths.
+     *
+     * @return plugin dependency jars.
+     */
+    List<URL> getPluginJarAndDependencyPaths(List<PluginIdentifier> 
pluginIdentifiers);
+
     /**
      * Get plugin instance by plugin identifier.
      *
diff --git 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
index a2cc1f95d1..90b99e49f4 100644
--- 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
+++ 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 
 import org.junit.jupiter.api.AfterEach;
@@ -32,9 +33,11 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.MalformedURLException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -65,6 +68,28 @@ class SeaTunnelSourcePluginDiscoveryTest {
             Lists.newArrayList(
                     Paths.get(seatunnelHome, "connectors", 
"connector-http-jira.jar"),
                     Paths.get(seatunnelHome, "connectors", 
"connector-http.jar"),
+                    Paths.get(seatunnelHome, "connectors", 
"connector-clickhouse.jar"),
+                    Paths.get(
+                            seatunnelHome,
+                            "plugins",
+                            "connector-clickhouse",
+                            "clickhouse-jdbc-driver.jar"),
+                    Paths.get(
+                            seatunnelHome,
+                            "plugins",
+                            "connector-clickhouse",
+                            "clickhouse-jdbc-driver2.jar"),
+                    Paths.get(seatunnelHome, "plugins", "connector-jdbc", 
"mysql-jdbc-driver.jar"),
+                    Paths.get(seatunnelHome, "plugins", "connector-jdbc", 
"mysql-jdbc-driver2.jar"),
+                    Paths.get(seatunnelHome, "plugins", "other", 
"common-dependency.jar"),
+                    Paths.get(seatunnelHome, "plugins", "other", 
"common-dependency2.jar"),
+                    Paths.get(seatunnelHome, "plugins", 
"common-dependency3.jar"),
+                    Paths.get(
+                            seatunnelHome,
+                            "plugins",
+                            "otherWithLib",
+                            "lib",
+                            "common-dependency3.jar"),
                     Paths.get(seatunnelHome, "connectors", 
"connector-kafka.jar"),
                     Paths.get(seatunnelHome, "connectors", 
"connector-kafka-alcs.jar"),
                     Paths.get(seatunnelHome, "connectors", 
"connector-kafka-blcs.jar"),
@@ -84,7 +109,7 @@ class SeaTunnelSourcePluginDiscoveryTest {
 
         // The file is created under target directory.
         for (Path pluginJar : pluginJars) {
-            Files.createFile(pluginJar);
+            FileUtils.createNewFile(pluginJar.toString());
         }
     }
 
@@ -180,6 +205,116 @@ class SeaTunnelSourcePluginDiscoveryTest {
                         .collect(Collectors.toList()));
     }
 
+    @Test
+    public void testGetPluginDependencies() throws MalformedURLException {
+        PluginIdentifier jdbc =
+                PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), 
"JDBC");
+        PluginIdentifier clickhouse =
+                PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), 
"ClickHouse");
+        SeaTunnelSourcePluginDiscovery discovery = new 
SeaTunnelSourcePluginDiscovery();
+        List<String> jdbcAndClickHouseJars =
+                
discovery.getPluginJarAndDependencyPaths(Lists.newArrayList(jdbc, clickhouse))
+                        .stream()
+                        .map(
+                                url -> {
+                                    try {
+                                        return new File(url.toURI()).getPath();
+                                    } catch (Exception e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                        .collect(Collectors.toList());
+        Assertions.assertIterableEquals(
+                Lists.newArrayList(
+                        Paths.get(seatunnelHome, 
"/connectors/connector-clickhouse.jar").toString(),
+                        Paths.get(seatunnelHome, 
"/connectors/connector-jdbc-release-1.1.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/common-dependency3.jar").toString(),
+                        Paths.get(
+                                        seatunnelHome,
+                                        
"/plugins/connector-clickhouse/clickhouse-jdbc-driver.jar")
+                                .toString(),
+                        Paths.get(
+                                        seatunnelHome,
+                                        
"/plugins/connector-clickhouse/clickhouse-jdbc-driver2.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/connector-jdbc/mysql-jdbc-driver.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/connector-jdbc/mysql-jdbc-driver2.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/other/common-dependency.jar").toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/other/common-dependency2.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/otherWithLib/lib/common-dependency3.jar")
+                                .toString()),
+                jdbcAndClickHouseJars);
+        List<String> jdbcJars =
+                
discovery.getPluginJarAndDependencyPaths(Lists.newArrayList(jdbc)).stream()
+                        .map(
+                                url -> {
+                                    try {
+                                        return new File(url.toURI()).getPath();
+                                    } catch (Exception e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                        .collect(Collectors.toList());
+        Assertions.assertIterableEquals(
+                Lists.newArrayList(
+                        Paths.get(seatunnelHome, 
"/connectors/connector-jdbc-release-1.1.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/common-dependency3.jar").toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/connector-jdbc/mysql-jdbc-driver.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/connector-jdbc/mysql-jdbc-driver2.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/other/common-dependency.jar").toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/other/common-dependency2.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/otherWithLib/lib/common-dependency3.jar")
+                                .toString()),
+                jdbcJars);
+        List<String> clickhouseJars =
+                
discovery.getPluginJarAndDependencyPaths(Lists.newArrayList(clickhouse)).stream()
+                        .map(
+                                url -> {
+                                    try {
+                                        return new File(url.toURI()).getPath();
+                                    } catch (Exception e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                        .collect(Collectors.toList());
+        Assertions.assertIterableEquals(
+                Lists.newArrayList(
+                        Paths.get(seatunnelHome, 
"/connectors/connector-clickhouse.jar").toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/common-dependency3.jar").toString(),
+                        Paths.get(
+                                        seatunnelHome,
+                                        
"/plugins/connector-clickhouse/clickhouse-jdbc-driver.jar")
+                                .toString(),
+                        Paths.get(
+                                        seatunnelHome,
+                                        
"/plugins/connector-clickhouse/clickhouse-jdbc-driver2.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/other/common-dependency.jar").toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/other/common-dependency2.jar")
+                                .toString(),
+                        Paths.get(seatunnelHome, 
"/plugins/otherWithLib/lib/common-dependency3.jar")
+                                .toString()),
+                clickhouseJars);
+    }
+
+    @Test
+    public void testGetPluginsJarDependenciesWithoutConnectorDependency() {
+        List<Path> paths = 
Common.getPluginsJarDependenciesWithoutConnectorDependency();
+        Assertions.assertIterableEquals(
+                Collections.singletonList(
+                        Paths.get(
+                                seatunnelHome, 
"/plugins/otherWithLib/lib/common-dependency3.jar")),
+                paths);
+    }
+
     @AfterEach
     public void after() throws IOException {
         for (Path pluginJar : pluginJars) {
diff --git 
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
 
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
index f4c561c1bb..99959263a1 100644
--- 
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
+++ 
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
@@ -19,6 +19,8 @@ seatunnel.source.HttpBase = connector-http
 seatunnel.sink.HttpBase = connector-http
 seatunnel.source.HttpJira = connector-http-jira
 seatunnel.sink.HttpJira = connector-http-jira
+seatunnel.source.Clickhouse = connector-clickhouse
+seatunnel.sink.Clickhouse = connector-clickhouse
 seatunnel.source.Kafka = connector-kafka
 seatunnel.sink.Kafka = connector-kafka
 seatunnel.source.Kafka-Alcs = connector-kafka-alcs
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 842ee173dd..472928a3a0 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,6 +50,19 @@ import java.util.stream.Collectors;
 @Slf4j
 public class CoordinatedSource<T, SplitT extends SourceSplit, StateT extends 
Serializable>
         implements BaseSourceFunction<T> {
+
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     protected static final long SLEEP_TIME_INTERVAL = 5L;
     protected final SeaTunnelSource<T, SplitT, StateT> source;
     protected final Map<Integer, List<byte[]>> restoredState;
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index ed794a5b6c..f3e768930c 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,6 +45,19 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 public class ParallelSource<T, SplitT extends SourceSplit, StateT extends 
Serializable>
         implements BaseSourceFunction<T> {
+
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ParallelSource.class);
 
     protected final SeaTunnelSource<T, SplitT, StateT> source;
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 022ef3224c..f961b67b92 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
+import java.sql.DriverManager;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -46,6 +47,18 @@ import java.util.stream.Collectors;
 public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT>
         implements Sink<InputT, CommitWrapper<CommT>, 
FlinkWriterState<WriterStateT>, GlobalCommT> {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, 
GlobalCommT> sink;
 
     private final List<CatalogTable> catalogTables;
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
index 7868e6d3ef..58654e1dd1 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.Serializable;
+import java.sql.DriverManager;
 
 /**
  * The source implementation of {@link Source}, used for proxy all {@link 
SeaTunnelSource} in flink.
@@ -48,6 +49,30 @@ public class FlinkSource<SplitT extends SourceSplit, 
EnumStateT extends Serializ
         implements Source<SeaTunnelRow, SplitWrapper<SplitT>, EnumStateT>,
                 ResultTypeQueryable<SeaTunnelRow> {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;
 
     private final Config envConfig;
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
index f8ed25c044..28a8f1d952 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
@@ -30,9 +30,22 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 
 import java.io.IOException;
+import java.sql.DriverManager;
 
 public class SparkDataWriterFactory<CommitInfoT, StateT> implements 
DataWriterFactory<InternalRow> {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
     private final CatalogTable[] catalogTables;
     private final String jobId;
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
index 59f4c9d0cb..67e8ca8df4 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
@@ -34,6 +34,7 @@ import 
org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
 
 import java.io.IOException;
+import java.sql.DriverManager;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -43,6 +44,18 @@ import java.util.stream.Collectors;
 public class SeaTunnelBatchWrite<StateT, CommitInfoT, AggregatedCommitInfoT>
         implements BatchWrite, StreamingWrite {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink;
 
     private final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> 
aggregatedCommitter;
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
index 7d7537222e..cfd8605f74 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
@@ -31,10 +31,23 @@ import 
org.apache.spark.sql.connector.write.DataWriterFactory;
 import 
org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
 
 import java.io.IOException;
+import java.sql.DriverManager;
 
 public class SeaTunnelSparkDataWriterFactory<CommitInfoT, StateT>
         implements DataWriterFactory, StreamingDataWriterFactory {
 
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
     private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
     private final CatalogTable[] catalogTables;
     private final String jobId;

Reply via email to