This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4beb2b9336 [Feature][Core] Add plugin directory support for each
connector (#9650)
4beb2b9336 is described below
commit 4beb2b9336fa3da029330a45e71cc8494cd8b1c3
Author: Jia Fan <[email protected]>
AuthorDate: Mon Aug 25 15:35:46 2025 +0800
[Feature][Core] Add plugin directory support for each connector (#9650)
---
.../connector-v2/connector-isolated-dependency.md | 49 ++++++++
docs/sidebars.js | 1 +
.../connector-v2/connector-isolated-dependency.md | 50 ++++++++
docs/zh/connector-v2/source-common-options.md | 2 +-
plugins/README.md | 50 +++++++-
.../org/apache/seatunnel/common/config/Common.java | 8 +-
.../apache/seatunnel/common/utils/FileUtils.java | 4 +
.../cdc/base/source/IncrementalSource.java | 42 +++++++
.../debezium/format/DebeziumJsonFormatTest.java | 6 +
.../cdc/mongodb/MongodbIncrementalSource.java | 5 +
.../cdc/mysql/source/MySqlIncrementalSource.java | 6 +
.../source/MySqlIncrementalSourceFactory.java | 6 +
.../OpengaussIncrementalSourceFactory.java | 8 ++
.../oracle/config/OracleSourceConfigFactory.java | 4 +-
.../cdc/oracle/source/OracleIncrementalSource.java | 6 +
.../source/OracleIncrementalSourceFactory.java | 8 ++
.../postgres/source/PostgresIncrementalSource.java | 6 +
.../source/PostgresIncrementalSourceFactory.java | 8 ++
.../source/SqlServerIncrementalSource.java | 6 +
.../source/SqlServerIncrementalSourceFactory.java | 11 ++
.../seatunnel/cdc/tidb/source/TiDBSource.java | 21 ++++
.../cdc/tidb/source/TiDBSourceFactory.java | 8 ++
.../seatunnel/connectors/doris/sink/DorisSink.java | 31 ++++-
.../doris/source/DorisSourceFactory.java | 6 +
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 39 +++++-
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 7 +-
.../seatunnel/jdbc/source/JdbcSource.java | 36 ++++++
.../seatunnel/starrocks/sink/StarRocksSink.java | 21 ++++
.../starter/flink/execution/FlinkExecution.java | 24 +++-
.../flink/execution/SourceExecuteProcessor.java | 3 +-
.../seatunnel/core/starter/spark/SparkStarter.java | 6 +-
.../seatunnel/core/starter/spark/SparkStarter.java | 6 +-
.../spark/execution/SourceExecuteProcessor.java | 3 +-
.../starter/spark/execution/SparkExecution.java | 14 +++
.../engine/core/job/AbstractJobEnvironment.java | 20 +--
.../engine/core/parse/ConnectorInstanceLoader.java | 97 ---------------
.../core/parse/MultipleTableJobConfigParser.java | 22 +++-
.../seatunnel/engine/server/SeaTunnelServer.java | 13 ++
.../plugin/discovery/AbstractPluginDiscovery.java | 92 +++++++++++---
.../plugin/discovery/PluginDiscovery.java | 7 ++
.../SeaTunnelSourcePluginDiscoveryTest.java | 137 ++++++++++++++++++++-
.../duplicate/connectors/plugin-mapping.properties | 2 +
.../translation/source/CoordinatedSource.java | 14 +++
.../translation/source/ParallelSource.java | 14 +++
.../translation/flink/sink/FlinkSink.java | 13 ++
.../translation/flink/source/FlinkSource.java | 25 ++++
.../spark/sink/writer/SparkDataWriterFactory.java | 13 ++
.../spark/sink/SeaTunnelBatchWrite.java | 13 ++
.../write/SeaTunnelSparkDataWriterFactory.java | 13 ++
49 files changed, 850 insertions(+), 156 deletions(-)
diff --git a/docs/en/connector-v2/connector-isolated-dependency.md
b/docs/en/connector-v2/connector-isolated-dependency.md
new file mode 100644
index 0000000000..dfb05e79ef
--- /dev/null
+++ b/docs/en/connector-v2/connector-isolated-dependency.md
@@ -0,0 +1,49 @@
+# Connector Isolated Dependency Loading Mechanism
+
+SeaTunnel provides an isolated dependency loading mechanism for each
connector, making it easier for users to manage individual dependencies for
different connectors, while avoiding dependency conflicts and improving system
extensibility.
+When loading a connector, SeaTunnel will search for and load the connector's
own dependency jars from the `${SEATUNNEL_HOME}/plugins/connector-xxx`
directory. This ensures that the dependencies required by different connectors
do not interfere with each other, which is helpful for managing a large number
of connectors in complex environments.
+
+## Principle
+
+Each connector needs to place its own dependency jars in a dedicated
subdirectory under `${SEATUNNEL_HOME}/plugins/connector-xxx` (manual creation
required).
+The subdirectory name is specified by the value in the `plugin-mapping` file.
When SeaTunnel starts and loads connectors, it will only load jars from the
corresponding directory, thus achieving dependency isolation.
+
+Currently, the Zeta engine ensures that jars for different connectors in the
same job are loaded separately. The other two engines still load all connector
dependency jars together, so placing different versions of jars for the same
job in Spark/Flink environments may cause dependency conflicts.
+
+## Directory Structure Example
+
+- Use `${SEATUNNEL_HOME}/connectors/plugin-mapping.properties` to get the
folder name for each connector.
+
+For example, for AmazonDynamodb, suppose the following configuration exists in
the `plugin-mapping` file:
+```
+seatunnel.source.AmazonDynamodb = connector-amazondynamodb
+```
+
+The corresponding connector dependency directory is the value
`connector-amazondynamodb`.
+
+The final directory structure is as follows:
+
+```
+SEATUNNEL_HOME/
+ plugins/
+ connector-amazondynamodb/
+ dependency1.jar
+ dependency2.jar
+ connector-xxx/
+ dependencyA.jar
+ dependencyB.jar
+```
+
+## Limitations
+
+- For the Zeta engine, please ensure that the
`${SEATUNNEL_HOME}/plugins/connector-xxx` directory structure is consistent
across all nodes. Each node must contain the same subdirectories and dependency
jars.
+- Any directory or jar that does not start with `connector-` will be treated
as a common dependency directory, and all engines and connectors will load such
jars.
+- In the Zeta engine, you can achieve shared dependencies for all connectors
by placing common jars in the `${SEATUNNEL_HOME}/lib/` directory.
+
+## Verification
+
+- By checking the job logs, you can confirm that each connector only loads its
own dependency jars.
+
+ ```log
+ 2025-08-13T17:55:48.7732601Z [] 2025-08-13 17:55:47,270 INFO
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - find connector
jar and dependency for PluginIdentifier{engineType='seatunnel',
pluginType='source', pluginName='Jdbc'}:
[file:/tmp/seatunnel/plugins/Jdbc/lib/vertica-jdbc-12.0.3-0.jar,
file:/tmp/seatunnel/connectors/connector-jdbc-2.3.12-SNAPSHOT-2.12.15.jar]
+ ```
diff --git a/docs/sidebars.js b/docs/sidebars.js
index 5f1a2a28f2..cfd589ec11 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -157,6 +157,7 @@ const sidebars = {
},
"connector-v2/source-common-options",
"connector-v2/sink-common-options",
+ "connector-v2/connector-isolated-dependency",
"connector-v2/Error-Quick-Reference-Manual",
"connector-v2/Config-Encryption-Decryption"
]
diff --git a/docs/zh/connector-v2/connector-isolated-dependency.md
b/docs/zh/connector-v2/connector-isolated-dependency.md
new file mode 100644
index 0000000000..2e33fcf073
--- /dev/null
+++ b/docs/zh/connector-v2/connector-isolated-dependency.md
@@ -0,0 +1,50 @@
+# Connector 依赖隔离加载机制
+
+SeaTunnel 提供了针对每个 connector 的依赖隔离加载机制,方便用户管理不同连接器单独的依赖,同时避免依赖冲突并提升系统的可扩展性。
+当加载 connector 时,SeaTunnel 会从 `${SEATUNNEL_HOME}` 下的 `plugins/connector-xxx`
目录中,查找并加载该 connector 独立的依赖 jar。这种方式确保了不同 connector 所需的依赖不会相互影响,便于在复杂环境下管理大量
connector。
+
+## 实现原理
+
+每个 connector 需要将自己的依赖 jar 放置在 `${SEATUNNEL_HOME}/plugins/connector-xxx`
目录下的独立子目录中(需要手动创建)。
+子目录名称由 `plugin-mapping` 文件中的 value 值指定。SeaTunnel 启动并加载 connector 时,只会加载对应目录下的
jar,从而实现依赖的隔离。
+
+目前,Zeta 引擎会保证同一个任务不同connector的jar分开加载。其他两个引擎仍然会将所有 connector 的依赖 jar
一起加载,同一个任务放置了不同版本的jar在Spark/Flink环境可能导致依赖冲突。
+
+## 目录结构示例
+
+- 通过`${SEATUNNEL_HOME}/connectors/plugin-mapping.properties`
获取每个connector对应的文件夹目录命名。
+
+以AmazonDynamodb为例,假设在 `plugin-mapping` 文件中有以下配置:
+```
+seatunnel.source.AmazonDynamodb = connector-amazondynamodb
+```
+
+则对应的connector依赖目录就是value值 `connector-amazondynamodb`。
+
+最终的目录结构如下所示:
+
+```
+SEATUNNEL_HOME/
+ plugins/
+ connector-amazondynamodb/
+ dependency1.jar
+ dependency2.jar
+ connector-xxx/
+ dependencyA.jar
+ dependencyB.jar
+```
+
+## 限制说明
+
+- 在Zeta引擎中,请确保所有节点的 `${SEATUNNEL_HOME}/plugins/` 目录结构一致。都需要包含相同的子目录和依赖 jar。
+- 任何没有以`connector-`开头的目录或者jar都将被当作通用依赖目录处理,所有引擎和connector都会加载此类jar。
+- 在Zeta引擎中,可以通过将通用的jar放到 `${SEATUNNEL_HOME}/lib/` 目录下来实现所有 connector 的共享依赖。
+
+## 验证
+
+- 通过追踪任务日志,确认每个 connector 只加载了其独立的依赖 jar。
+
+ ```log
+ 2025-08-13T17:55:48.7732601Z [] 2025-08-13 17:55:47,270 INFO
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - find connector
jar and dependency for PluginIdentifier{engineType='seatunnel',
pluginType='source', pluginName='Jdbc'}:
[file:/tmp/seatunnel/plugins/Jdbc/lib/vertica-jdbc-12.0.3-0.jar,
file:/tmp/seatunnel/connectors/connector-jdbc-2.3.12-SNAPSHOT-2.12.15.jar]
+ ```
+
diff --git a/docs/zh/connector-v2/source-common-options.md
b/docs/zh/connector-v2/source-common-options.md
index 079acd60f8..4189c07591 100644
--- a/docs/zh/connector-v2/source-common-options.md
+++ b/docs/zh/connector-v2/source-common-options.md
@@ -2,7 +2,7 @@
sidebar_position: 3
---
-# Source Common Options
+# Source 常用选项
> Source connector 的常用参数
diff --git a/plugins/README.md b/plugins/README.md
index 0d58dfba49..dfb05e79ef 100644
--- a/plugins/README.md
+++ b/plugins/README.md
@@ -1,11 +1,49 @@
-# Introduction of plugins directory
+# Connector Isolated Dependency Loading Mechanism
-This directory used to store some third party jar package dependency by
connector running, such as jdbc drivers.
+SeaTunnel provides an isolated dependency loading mechanism for each
connector, making it easier for users to manage individual dependencies for
different connectors, while avoiding dependency conflicts and improving system
extensibility.
+When loading a connector, SeaTunnel will search for and load the connector's
own dependency jars from the `${SEATUNNEL_HOME}/plugins/connector-xxx`
directory. This ensures that the dependencies required by different connectors
do not interfere with each other, which is helpful for managing a large number
of connectors in complex environments.
-!!!Attention: If you use Zeta Engine, please add jar to `$SEATUNNEL_HOME/lib/`
directory on each node.
+## Principle
-## directory structure
+Each connector needs to place its own dependency jars in a dedicated
subdirectory under `${SEATUNNEL_HOME}/plugins/connector-xxx` (manual creation
required).
+The subdirectory name is specified by the value in the `plugin-mapping` file.
When SeaTunnel starts and loads connectors, it will only load jars from the
corresponding directory, thus achieving dependency isolation.
-The jar dependency by connector need put in `plugins/${connector name}/lib/`
dir.
+Currently, the Zeta engine ensures that jars for different connectors in the
same job are loaded separately. The other two engines still load all connector
dependency jars together, so placing different versions of jars for the same
job in Spark/Flink environments may cause dependency conflicts.
-For example jdbc driver jars need put in
`${seatunnel_install_home}/plugins/jdbc/lib/`
\ No newline at end of file
+## Directory Structure Example
+
+- Use `${SEATUNNEL_HOME}/connectors/plugin-mapping.properties` to get the
folder name for each connector.
+
+For example, for AmazonDynamodb, suppose the following configuration exists in
the `plugin-mapping` file:
+```
+seatunnel.source.AmazonDynamodb = connector-amazondynamodb
+```
+
+The corresponding connector dependency directory is the value
`connector-amazondynamodb`.
+
+The final directory structure is as follows:
+
+```
+SEATUNNEL_HOME/
+ plugins/
+ connector-amazondynamodb/
+ dependency1.jar
+ dependency2.jar
+ connector-xxx/
+ dependencyA.jar
+ dependencyB.jar
+```
+
+## Limitations
+
+- For the Zeta engine, please ensure that the
`${SEATUNNEL_HOME}/plugins/connector-xxx` directory structure is consistent
across all nodes. Each node must contain the same subdirectories and dependency
jars.
+- Any directory or jar that does not start with `connector-` will be treated
as a common dependency directory, and all engines and connectors will load such
jars.
+- In the Zeta engine, you can achieve shared dependencies for all connectors
by placing common jars in the `${SEATUNNEL_HOME}/lib/` directory.
+
+## Verification
+
+- By checking the job logs, you can confirm that each connector only loads its
own dependency jars.
+
+ ```log
+ 2025-08-13T17:55:48.7732601Z [] 2025-08-13 17:55:47,270 INFO
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - find connector
jar and dependency for PluginIdentifier{engineType='seatunnel',
pluginType='source', pluginName='Jdbc'}:
[file:/tmp/seatunnel/plugins/Jdbc/lib/vertica-jdbc-12.0.3-0.jar,
file:/tmp/seatunnel/connectors/connector-jdbc-2.3.12-SNAPSHOT-2.12.15.jar]
+ ```
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index df7fe5cef6..e22fccd1fb 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -173,7 +173,7 @@ public class Common {
}
/** return plugin's dependent jars, which located in
'plugins/${pluginName}/lib/*'. */
- public static List<Path> getPluginsJarDependencies() {
+ public static List<Path>
getPluginsJarDependenciesWithoutConnectorDependency() {
Path pluginRootDir = Common.pluginRootDir();
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir))
{
return Collections.emptyList();
@@ -183,6 +183,12 @@ public class Common {
it ->
pluginRootDir.relativize(it).getNameCount()
== PLUGIN_LIB_DIR_DEPTH)
+ .filter(
+ it ->
+ !it.getParent()
+ .getParent()
+
.getName(it.getParent().getParent().getNameCount() - 1)
+ .startsWith("connector-"))
.filter(it -> it.getParent().endsWith("lib"))
.filter(it -> it.getFileName().toString().endsWith(".jar"))
.collect(Collectors.toList());
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
index 90ff8e5d49..b228d1be1a 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -35,6 +35,7 @@ import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@@ -45,6 +46,9 @@ import java.util.stream.Stream;
public class FileUtils {
public static List<URL> searchJarFiles(@NonNull Path directory) throws
IOException {
+ if (!directory.toFile().exists()) {
+ return new ArrayList<>();
+ }
try (Stream<Path> paths = Files.walk(directory,
FileVisitOption.FOLLOW_LINKS)) {
return paths.filter(path -> path.toString().endsWith(".jar"))
.map(
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 85340c4a3a..415aa32467 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -66,7 +66,9 @@ import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJs
import io.debezium.relational.TableId;
import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -74,6 +76,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -82,9 +85,22 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
@NoArgsConstructor
+@Slf4j
public abstract class IncrementalSource<T, C extends SourceConfig>
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
protected ReadonlyConfig readonlyConfig;
protected SourceConfig.Factory<C> configFactory;
protected OffsetFactory offsetFactory;
@@ -191,6 +207,8 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
public abstract OffsetFactory createOffsetFactory(ReadonlyConfig config);
+ public abstract Optional<String> driverName();
+
@Override
public Boundedness getBoundedness() {
return stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED :
Boundedness.BOUNDED;
@@ -200,6 +218,14 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
@Override
public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context
readerContext)
throws Exception {
+ // Load the JDBC driver in to DriverManager
+ if (driverName().isPresent()) {
+ try {
+ Class.forName(driverName().get());
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver: {}", driverName().get(),
e);
+ }
+ }
// create source config for the given subtask (e.g. unique server id)
C sourceConfig =
configFactory.create(readerContext.getIndexOfSubtask());
BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
@@ -232,6 +258,14 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
@Override
public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState>
createEnumerator(
SourceSplitEnumerator.Context<SourceSplitBase> enumeratorContext)
throws Exception {
+ // Load the JDBC driver in to DriverManager
+ if (driverName().isPresent()) {
+ try {
+ Class.forName(driverName().get());
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver: {}", driverName().get(),
e);
+ }
+ }
C sourceConfig = configFactory.create(0);
final List<TableId> remainingTables =
dataSourceDialect.discoverDataCollections(sourceConfig);
@@ -273,6 +307,14 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
SourceSplitEnumerator.Context<SourceSplitBase> enumeratorContext,
PendingSplitsState checkpointState)
throws Exception {
+ // Load the JDBC driver in to DriverManager
+ if (driverName().isPresent()) {
+ try {
+ Class.forName(driverName().get());
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver: {}", driverName().get(),
e);
+ }
+ }
C sourceConfig = configFactory.create(0);
Set<TableId> capturedTables =
new
HashSet<>(dataSourceDialect.discoverDataCollections(sourceConfig));
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
index adb5b7389d..0a438180a4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
class DebeziumJsonFormatTest {
@@ -105,6 +106,11 @@ class DebeziumJsonFormatTest {
public String getPluginName() {
return "";
}
+
+ @Override
+ public Optional<String> driverName() {
+ return Optional.empty();
+ }
}
@Test
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
index 996eda76e8..a10f6ad15e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
@@ -136,4 +136,9 @@ public class MongodbIncrementalSource<T> extends
IncrementalSource<T, MongodbSou
SourceConfig sourceConfig, SourceReader.Context context) {
return new MongoDBRecordEmitter<>(deserializationSchema,
offsetFactory, context);
}
+
+ @Override
+ public Optional<String> driverName() {
+ return Optional.empty();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 50d745cacd..d75fcbe981 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -52,6 +52,7 @@ import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -163,4 +164,9 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
SchemaChangeType.RENAME_COLUMN,
SchemaChangeType.UPDATE_COLUMN);
}
+
+ @Override
+ public Optional<String> driverName() {
+ return Optional.of("com.mysql.cj.jdbc.Driver");
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index a37b2bdc0c..13f39beeec 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -103,6 +103,12 @@ public class MySqlIncrementalSourceFactory extends
BaseChangeStreamTableSourceFa
TableSource<T, SplitT, StateT> restoreSource(
TableSourceFactoryContext context, List<CatalogTable>
restoreTables) {
return () -> {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver
", e);
+ }
ReadonlyConfig config = context.getOptions();
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(config,
context.getClassLoader());
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
index bb52f01242..5418162ab0 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
@@ -38,12 +38,14 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresSou
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
@AutoService(Factory.class)
+@Slf4j
public class OpengaussIncrementalSourceFactory implements TableSourceFactory {
private static final String IDENTIFIER = "Opengauss-CDC";
@@ -89,6 +91,12 @@ public class OpengaussIncrementalSourceFactory implements
TableSourceFactory {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("org.postgresql.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver org.postgresql.Driver",
e);
+ }
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(
"Postgres", context.getOptions(),
context.getClassLoader());
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
index fa240bc158..1dda36a4be 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
@@ -73,8 +73,8 @@ public class OracleSourceConfigFactory extends
JdbcSourceConfigFactory {
try {
Class.forName(DRIVER_CLASS_NAME);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver {}", DRIVER_CLASS_NAME, e);
}
Properties props = new Properties();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
index 2b59e67209..28782f6098 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
@@ -52,6 +52,7 @@ import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -126,6 +127,11 @@ public class OracleIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceC
(OracleSourceConfigFactory) configFactory, (OracleDialect)
dataSourceDialect);
}
+ @Override
+ public Optional<String> driverName() {
+ return Optional.of("oracle.jdbc.OracleDriver");
+ }
+
private Map<TableId, Struct> tableChanges() {
JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
OracleDialect dialect =
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
index 5a22cf09e6..7b32421a7c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
@@ -38,12 +38,14 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceC
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
@AutoService(Factory.class)
+@Slf4j
public class OracleIncrementalSourceFactory extends
BaseChangeStreamTableSourceFactory {
@Override
public String factoryIdentifier() {
@@ -105,6 +107,12 @@ public class OracleIncrementalSourceFactory extends
BaseChangeStreamTableSourceF
TableSource<T, SplitT, StateT> restoreSource(
TableSourceFactoryContext context, List<CatalogTable>
restoreTables) {
return () -> {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("oracle.jdbc.OracleDriver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver {}",
"oracle.jdbc.OracleDriver", e);
+ }
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
index ed8ccb254b..a9e10e9516 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
@@ -50,6 +50,7 @@ import io.debezium.util.SchemaNameAdjuster;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -123,6 +124,11 @@ public class PostgresIncrementalSource<T> extends
IncrementalSource<T, JdbcSourc
(PostgresSourceConfigFactory) configFactory, (PostgresDialect)
dataSourceDialect);
}
+ @Override
+ public Optional<String> driverName() {
+ return Optional.of("org.postgresql.Driver");
+ }
+
private Map<TableId, Struct> tableChanges() {
JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
PostgresDialect dialect =
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSourceFactory.java
index 3a248ee37e..91e8f3c881 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSourceFactory.java
@@ -36,12 +36,14 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.option.PostgresOpt
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
@AutoService(Factory.class)
+@Slf4j
public class PostgresIncrementalSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
@@ -85,6 +87,12 @@ public class PostgresIncrementalSourceFactory implements
TableSourceFactory {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("org.postgresql.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver {}",
"org.postgresql.Driver", e);
+ }
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSource.java
index 0b82a254c3..0327b89b06 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSource.java
@@ -41,6 +41,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
import java.time.ZoneId;
import java.util.List;
+import java.util.Optional;
public class SqlServerIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig>
implements SupportParallelism {
@@ -108,4 +109,9 @@ public class SqlServerIncrementalSource<T> extends
IncrementalSource<T, JdbcSour
return new LsnOffsetFactory(
(SqlServerSourceConfigFactory) configFactory,
(SqlServerDialect) dataSourceDialect);
}
+
+ @Override
+ public Optional<String> driverName() {
+ return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceFactory.java
index ca124c25e5..96b661f240 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceFactory.java
@@ -37,12 +37,14 @@ import
org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
@AutoService(Factory.class)
+@Slf4j
public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
@Override
@@ -101,6 +103,15 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+ } catch (Exception e) {
+ log.warn(
+ "Failed to load JDBC driver {}",
+ "com.microsoft.sqlserver.jdbc.SQLServerDriver",
+ e);
+ }
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
index fef7adab34..5827b1c72d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
@@ -33,9 +33,12 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDB
import
org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader;
import
org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.Collections;
import java.util.List;
+@Slf4j
public class TiDBSource
implements SeaTunnelSource<SeaTunnelRow, TiDBSourceSplit,
TiDBSourceCheckpointState>,
SupportParallelism,
@@ -91,6 +94,12 @@ public class TiDBSource
@Override
public SourceReader<SeaTunnelRow, TiDBSourceSplit>
createReader(SourceReader.Context context)
throws Exception {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
+ }
return new TiDBSourceReader(context, config, catalogTable);
}
@@ -105,6 +114,12 @@ public class TiDBSource
@Override
public SourceSplitEnumerator<TiDBSourceSplit, TiDBSourceCheckpointState>
createEnumerator(
SourceSplitEnumerator.Context<TiDBSourceSplit> context) throws
Exception {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
+ }
return new TiDBSourceSplitEnumerator(context, config);
}
@@ -122,6 +137,12 @@ public class TiDBSource
SourceSplitEnumerator.Context<TiDBSourceSplit> context,
TiDBSourceCheckpointState checkpointState)
throws Exception {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
+ }
return new TiDBSourceSplitEnumerator(context, config, checkpointState);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
index 39d8b03739..254bc584c9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
@@ -32,10 +32,12 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
@AutoService(Factory.class)
+@Slf4j
public class TiDBSourceFactory implements TableSourceFactory {
/**
* Returns a unique identifier among same factory interfaces.
@@ -87,6 +89,12 @@ public class TiDBSourceFactory implements TableSourceFactory
{
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver
", e);
+ }
ReadonlyConfig config = context.getOptions();
TiDBCatalogFactory catalogFactory = new TiDBCatalogFactory();
// Build tidb catalog.
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 66b19e3971..88bee59012 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -45,6 +45,8 @@ import
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
import
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkStateSerializer;
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -53,6 +55,7 @@ import java.util.Optional;
import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+@Slf4j
public class DorisSink
implements SeaTunnelSink<SeaTunnelRow, DorisSinkState,
DorisCommitInfo, DorisCommitInfo>,
SupportSaveMode,
@@ -68,6 +71,12 @@ public class DorisSink
this.config = config;
this.catalogTable = catalogTable;
this.dorisSinkConfig = DorisSinkConfig.of(config);
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
+ }
}
@Override
@@ -82,6 +91,12 @@ public class DorisSink
@Override
public DorisSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
+ }
return new DorisSinkWriter(
context, Collections.emptyList(), catalogTable,
dorisSinkConfig, jobId);
}
@@ -89,6 +104,12 @@ public class DorisSink
@Override
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>
restoreWriter(
SinkWriter.Context context, List<DorisSinkState> states) throws
IOException {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
+ }
return new DorisSinkWriter(context, states, catalogTable,
dorisSinkConfig, jobId);
}
@@ -99,6 +120,12 @@ public class DorisSink
@Override
public Optional<SinkCommitter<DorisCommitInfo>> createCommitter() throws
IOException {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
+ }
return Optional.of(new DorisCommitter(dorisSinkConfig));
}
@@ -112,8 +139,8 @@ public class DorisSink
// Load the JDBC driver in to DriverManager
try {
Class.forName("com.mysql.cj.jdbc.Driver");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
}
CatalogFactory catalogFactory =
discoverFactory(
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
index 1bdbd86883..d025eaa061 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
@@ -80,6 +80,12 @@ public class DorisSourceFactory implements
TableSourceFactory {
@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver com.mysql.cj.jdbc.Driver ",
e);
+ }
DorisSourceConfig dorisSourceConfig =
DorisSourceConfig.of(context.getOptions());
List<DorisTableConfig> dorisTableConfigList =
dorisSourceConfig.getTableConfigList();
Map<TablePath, DorisSourceTable> dorisSourceTables = new HashMap<>();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 62b47a3f97..ee61b27557 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -93,6 +93,15 @@ public class JdbcSink
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
CatalogTable catalogTable) {
+ // Load the JDBC driver in to DriverManager
+ try {
+
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (Exception e) {
+ log.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
+ }
this.config = config;
this.jdbcSinkConfig = jdbcSinkConfig;
this.dialect = dialect;
@@ -111,8 +120,11 @@ public class JdbcSink
public AbstractJdbcSinkWriter createWriter(SinkWriter.Context context) {
try {
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ log.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
}
TablePath sinkTablePath = catalogTable.getTablePath();
AbstractJdbcSinkWriter sinkWriter;
@@ -158,8 +170,11 @@ public class JdbcSink
SinkWriter.Context context, List<JdbcSinkState> states) throws
IOException {
try {
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ log.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
}
TablePath sinkTablePath = catalogTable.getTablePath();
if (jdbcSinkConfig.isExactlyOnce()) {
@@ -204,6 +219,15 @@ public class JdbcSink
@Override
public Optional<SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo>>
createAggregatedCommitter() {
+ // Load the JDBC driver in to DriverManager
+ try {
+
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (Exception e) {
+ log.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
+ }
if (jdbcSinkConfig.isExactlyOnce()) {
return Optional.of(new
JdbcSinkAggregatedCommitter(jdbcSinkConfig));
}
@@ -235,8 +259,11 @@ public class JdbcSink
public Optional<SaveModeHandler> getSaveModeHandler() {
try {
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ log.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
}
if (catalogTable != null) {
Optional<Catalog> catalogOptional = getCatalog();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 6ffa0acd51..3137f427e8 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -76,8 +76,11 @@ public class JdbcSinkWriter extends
AbstractJdbcSinkWriter<ConnectionPoolManager
HikariDataSource ds = new HikariDataSource();
try {
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ log.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSinkConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
}
ds.setIdleTimeout(30 * 1000);
ds.setMaximumPoolSize(queueSize);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 8443aeb044..616842bc7f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -52,6 +52,15 @@ public class JdbcSource
@SneakyThrows
public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
+ // Load the JDBC driver in to DriverManager
+ try {
+
Class.forName(jdbcSourceConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSourceConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
+ }
this.jdbcSourceConfig = jdbcSourceConfig;
this.jdbcSourceTables =
JdbcCatalogUtils.getTables(
@@ -79,6 +88,15 @@ public class JdbcSource
@Override
public SourceReader<SeaTunnelRow, JdbcSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
+ // Load the JDBC driver in to DriverManager
+ try {
+
Class.forName(jdbcSourceConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSourceConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
+ }
Map<TablePath, CatalogTable> tables = new HashMap<>();
for (TablePath tablePath : jdbcSourceTables.keySet()) {
tables.put(tablePath,
jdbcSourceTables.get(tablePath).getCatalogTable());
@@ -94,6 +112,15 @@ public class JdbcSource
@Override
public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState>
createEnumerator(
SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext)
throws Exception {
+ // Load the JDBC driver in to DriverManager
+ try {
+
Class.forName(jdbcSourceConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSourceConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
+ }
return new JdbcSourceSplitEnumerator(
enumeratorContext, jdbcSourceConfig, jdbcSourceTables, null);
}
@@ -103,6 +130,15 @@ public class JdbcSource
SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext,
JdbcSourceState checkpointState)
throws Exception {
+ // Load the JDBC driver in to DriverManager
+ try {
+
Class.forName(jdbcSourceConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to load JDBC driver {}",
+ jdbcSourceConfig.getJdbcConnectionConfig().getDriverName(),
+ e);
+ }
return new JdbcSourceSplitEnumerator(
enumeratorContext, jdbcSourceConfig, jdbcSourceTables,
checkpointState);
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 06fdbb97bd..1b89ef3f4e 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -37,10 +37,13 @@ import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCata
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+@Slf4j
public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportSaveMode, SupportSchemaEvolutionSink,
SupportMultiTableSink {
@@ -56,6 +59,12 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
this.catalogTable = catalogTable;
this.dataSaveMode = sinkConfig.getDataSaveMode();
this.schemaSaveMode = sinkConfig.getSchemaSaveMode();
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver {}",
"com.mysql.cj.jdbc.Driver", e);
+ }
}
@Override
@@ -65,12 +74,24 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
@Override
public StarRocksSinkWriter createWriter(SinkWriter.Context context) {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver {}",
"com.mysql.cj.jdbc.Driver", e);
+ }
TablePath sinkTablePath = catalogTable.getTablePath();
return new StarRocksSinkWriter(sinkConfig, tableSchema, sinkTablePath);
}
@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (Exception e) {
+ log.warn("Failed to load JDBC driver {}",
"com.mysql.cj.jdbc.Driver", e);
+ }
TablePath tablePath =
TablePath.of(
catalogTable.getTableId().getDatabaseName(),
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 0d74c59c79..ae2c5039d4 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -45,9 +45,12 @@ import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Path;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -56,6 +59,18 @@ import java.util.stream.Stream;
/** Used to execute a SeaTunnelTask. */
public class FlinkExecution implements TaskExecution {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkExecution.class);
private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
@@ -102,7 +117,8 @@ public class FlinkExecution implements TaskExecution {
jarPaths, envConfig,
config.getConfigList(Constants.SINK), jobContext);
this.flinkRuntimeEnvironment =
-
FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths));
+ FlinkRuntimeEnvironment.getInstance(
+ this.registerPlugin(config, new HashSet<>(jarPaths)));
this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
@@ -154,7 +170,7 @@ public class FlinkExecution implements TaskExecution {
Common.getThirdPartyJars(
envConfig.getString(EnvCommonOptions.JARS.key())));
}
- thirdPartyJars.addAll(Common.getPluginsJarDependencies());
+
thirdPartyJars.addAll(Common.getPluginsJarDependenciesWithoutConnectorDependency());
List<URL> jarDependencies =
Stream.concat(thirdPartyJars.stream(),
Common.getLibJars().stream())
.map(Path::toUri)
@@ -173,7 +189,7 @@ public class FlinkExecution implements TaskExecution {
jarPaths.addAll(jarDependencies);
}
- private Config registerPlugin(Config config, List<URL> jars) {
+ private Config registerPlugin(Config config, Collection<URL> jars) {
config =
this.injectJarsToConfig(
config, ConfigUtil.joinPath("env", "pipeline",
"jars"), jars);
@@ -181,7 +197,7 @@ public class FlinkExecution implements TaskExecution {
config, ConfigUtil.joinPath("env", "pipeline", "classpaths"),
jars);
}
- private Config injectJarsToConfig(Config config, String path, List<URL>
jars) {
+ private Config injectJarsToConfig(Config config, String path,
Collection<URL> jars) {
List<URL> validJars = new ArrayList<>();
for (URL jarUrl : jars) {
if (new File(jarUrl.getFile()).exists()) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 29479ceba6..182592e9ca 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -117,7 +117,8 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
PluginType.SOURCE.getType(),
sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
-
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ sourcePluginDiscovery.getPluginJarAndDependencyPaths(
+ Lists.newArrayList(pluginIdentifier)));
Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> source =
FactoryUtil.createAndPrepareSource(
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 787c153ba3..52f1a2ee99 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -111,7 +111,6 @@ public class SparkStarter implements Starter {
setSparkConf();
Common.setDeployMode(commandArgs.getDeployMode());
Common.setStarter(true);
- this.jars.addAll(Common.getPluginsJarDependencies());
this.jars.addAll(Common.getLibJars());
this.jars.addAll(getConnectorJarDependencies());
this.jars.addAll(
@@ -152,13 +151,14 @@ public class SparkStarter implements Starter {
SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery();
pluginJars.addAll(
- seaTunnelSourcePluginDiscovery.getPluginJarPaths(
+ seaTunnelSourcePluginDiscovery.getPluginJarAndDependencyPaths(
getPluginIdentifiers(config, PluginType.SOURCE)));
pluginJars.addAll(
- seaTunnelSinkPluginDiscovery.getPluginJarPaths(
+ seaTunnelSinkPluginDiscovery.getPluginJarAndDependencyPaths(
getPluginIdentifiers(config, PluginType.SINK)));
return pluginJars.stream()
.map(url -> new File(url.getPath()).toPath())
+ .distinct()
.collect(Collectors.toList());
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 555aeaed8f..e734112e85 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -111,7 +111,6 @@ public class SparkStarter implements Starter {
setSparkConf();
Common.setDeployMode(commandArgs.getDeployMode());
Common.setStarter(true);
- this.jars.addAll(Common.getPluginsJarDependencies());
this.jars.addAll(Common.getLibJars());
this.jars.addAll(getConnectorJarDependencies());
this.jars.addAll(
@@ -152,13 +151,14 @@ public class SparkStarter implements Starter {
SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery();
pluginJars.addAll(
- seaTunnelSourcePluginDiscovery.getPluginJarPaths(
+ seaTunnelSourcePluginDiscovery.getPluginJarAndDependencyPaths(
getPluginIdentifiers(config, PluginType.SOURCE)));
pluginJars.addAll(
- seaTunnelSinkPluginDiscovery.getPluginJarPaths(
+ seaTunnelSinkPluginDiscovery.getPluginJarAndDependencyPaths(
getPluginIdentifiers(config, PluginType.SINK)));
return pluginJars.stream()
.map(url -> new File(url.getPath()).toPath())
+ .distinct()
.collect(Collectors.toList());
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 77fdda26fc..c2e4e055c9 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -129,7 +129,8 @@ public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<
PluginType.SOURCE.getType(),
sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
-
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ sourcePluginDiscovery.getPluginJarAndDependencyPaths(
+ Lists.newArrayList(pluginIdentifier)));
Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> source =
FactoryUtil.createAndPrepareSource(
ReadonlyConfig.fromConfig(sourceConfig),
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index efe3824145..4ced4137a7 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -30,12 +30,26 @@ import
org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
import lombok.extern.slf4j.Slf4j;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Slf4j
public class SparkExecution implements TaskExecution {
+
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private final SparkRuntimeEnvironment sparkRuntimeEnvironment;
private final PluginExecuteProcessor<DatasetTableInfo,
SparkRuntimeEnvironment>
sourcePluginExecuteProcessor;
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
index 28c6b4f012..cd01efaf40 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
@@ -18,9 +18,7 @@
package org.apache.seatunnel.engine.core.job;
import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
@@ -31,10 +29,8 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.io.File;
-import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
-import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -70,10 +66,18 @@ public abstract class AbstractJobEnvironment {
protected Set<URL> searchPluginJars() {
try {
- if (Files.exists(Common.pluginRootDir())) {
- return new
HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
- }
- } catch (IOException | SeaTunnelEngineException e) {
+ return new HashSet<>(
+
Common.getPluginsJarDependenciesWithoutConnectorDependency().stream()
+ .map(
+ p -> {
+ try {
+ return p.toUri().toURL();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList()));
+ } catch (Exception e) {
LOGGER.warning(
String.format("Can't search plugin jars in %s.",
Common.pluginRootDir()), e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
deleted file mode 100644
index 87b1a4e7cd..0000000000
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.parse;
-
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PluginIdentifier;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.constants.CollectionConstants;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-
-import scala.Serializable;
-
-import java.net.URL;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class ConnectorInstanceLoader {
- private ConnectorInstanceLoader() {
- throw new IllegalStateException("Utility class");
- }
-
- public static ImmutablePair<SeaTunnelSource, Set<URL>> loadSourceInstance(
- Config sourceConfig, JobContext jobContext, List<URL> pluginJars) {
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new
SeaTunnelSourcePluginDiscovery();
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- CollectionConstants.SEATUNNEL_PLUGIN,
- CollectionConstants.SOURCE_PLUGIN,
-
sourceConfig.getString(CollectionConstants.PLUGIN_NAME));
-
- List<URL> pluginJarPaths =
-
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
-
- SeaTunnelSource seaTunnelSource =
- sourcePluginDiscovery.createPluginInstance(pluginIdentifier,
pluginJars);
- return new ImmutablePair<>(seaTunnelSource, new
HashSet<>(pluginJarPaths));
- }
-
- public static ImmutablePair<
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>, Set<URL>>
- loadSinkInstance(Config sinkConfig, JobContext jobContext,
List<URL> pluginJars) {
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- CollectionConstants.SEATUNNEL_PLUGIN,
- CollectionConstants.SINK_PLUGIN,
- sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
- List<URL> pluginJarPaths =
-
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>
seaTunnelSink =
- sinkPluginDiscovery.createPluginInstance(pluginIdentifier,
pluginJars);
- return new ImmutablePair<>(seaTunnelSink, new
HashSet<>(pluginJarPaths));
- }
-
- public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>>
loadTransformInstance(
- Config transformConfig, JobContext jobContext, List<URL>
pluginJars) {
- SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
- new SeaTunnelTransformPluginDiscovery();
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- CollectionConstants.SEATUNNEL_PLUGIN,
- CollectionConstants.TRANSFORM_PLUGIN,
-
transformConfig.getString(CollectionConstants.PLUGIN_NAME));
-
- List<URL> pluginJarPaths =
-
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
- SeaTunnelTransform<?> seaTunnelTransform =
-
transformPluginDiscovery.createPluginInstance(pluginIdentifier, pluginJars);
- return new ImmutablePair<>(seaTunnelTransform, new
HashSet<>(pluginJarPaths));
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 377040ff95..b5b27db163 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -78,6 +78,7 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -104,6 +105,18 @@ import static
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputId
@Slf4j
public class MultipleTableJobConfigParser {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private final IdGenerator idGenerator;
private final JobConfig jobConfig;
@@ -289,7 +302,8 @@ public class MultipleTableJobConfigParser {
factory))
.collect(Collectors.toList());
List<URL> jarPaths = new ArrayList<>();
- jarPaths.addAll(new
SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds));
+ jarPaths.addAll(
+ new
SeaTunnelSinkPluginDiscovery().getPluginJarAndDependencyPaths(factoryIds));
jarPaths.addAll(commonPluginJars);
return jarPaths;
}
@@ -749,7 +763,8 @@ public class MultipleTableJobConfigParser {
CollectionConstants.SOURCE_PLUGIN,
sourceConfig.getString(CollectionConstants.PLUGIN_NAME));
List<URL> pluginJarPaths =
-
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
+ sourcePluginDiscovery.getPluginJarAndDependencyPaths(
+ Lists.newArrayList(pluginIdentifier));
return pluginJarPaths;
}
@@ -774,7 +789,8 @@ public class MultipleTableJobConfigParser {
CollectionConstants.SINK_PLUGIN,
sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
List<URL> pluginJarPaths =
-
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
+ sinkPluginDiscovery.getPluginJarAndDependencyPaths(
+ Lists.newArrayList(pluginIdentifier));
return pluginJarPaths;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 98bd6210ef..199fb85b42 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -50,6 +50,7 @@ import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
+import java.sql.DriverManager;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -59,6 +60,18 @@ import java.util.concurrent.TimeUnit;
public class SeaTunnelServer
implements ManagedService, MembershipAwareService,
LiveOperationsTracker {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private static final ILogger LOGGER =
Logger.getLogger(SeaTunnelServer.class);
public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 0f8ed3755a..194c080633 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -49,11 +49,14 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -124,10 +127,7 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
protected static Config loadConnectorPluginConfig() {
return
ConfigFactory.parseFile(Common.connectorDir().resolve(PLUGIN_MAPPING_FILE).toFile())
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
}
@Override
@@ -141,6 +141,32 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
.collect(Collectors.toList());
}
+ @Override
+ public List<URL> getPluginJarAndDependencyPaths(List<PluginIdentifier>
pluginIdentifiers) {
+ return pluginIdentifiers.stream()
+ .flatMap(
+ pluginIdentifier -> {
+ try {
+ List<URL> jars =
getPluginDependencyJarPaths(pluginIdentifier);
+
getPluginJarPath(pluginIdentifier).ifPresent(jars::addAll);
+ log.info(
+ "find connector jar and dependency for
{}: {}",
+ pluginIdentifier,
+ jars);
+ return jars.stream();
+ } catch (IOException e) {
+ log.warn(
+ "get plugin dependency jar path
failed, pluginIdentifier: {}",
+ pluginIdentifier,
+ e);
+ return Stream.empty();
+ }
+ })
+ .distinct()
+ .sorted(Comparator.comparing(URL::toString))
+ .collect(Collectors.toList());
+ }
+
@Override
public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
return pluginIdentifiers.stream()
@@ -400,13 +426,7 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
*/
protected abstract Class<T> getPluginBaseClass();
- /**
- * Find the plugin jar path;
- *
- * @param pluginIdentifier plugin identifier.
- * @return plugin jar path.
- */
- private Optional<List<URL>> findPluginJarPath(PluginIdentifier
pluginIdentifier) {
+ private Optional<String> getPluginMappingPrefix(PluginIdentifier
pluginIdentifier) {
final String engineType =
pluginIdentifier.getEngineType().toLowerCase();
final String pluginType =
pluginIdentifier.getPluginType().toLowerCase();
final String pluginName =
pluginIdentifier.getPluginName().toLowerCase();
@@ -422,15 +442,28 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
typeConfig.entrySet().stream()
.filter(entry ->
StringUtils.equalsIgnoreCase(entry.getKey(), pluginName))
.findFirst();
- if (!optional.isPresent()) {
+ return optional.map(entry -> entry.getValue().unwrapped().toString());
+ }
+
+ /**
+ * Find the plugin jar path;
+ *
+ * @param pluginIdentifier plugin identifier.
+ * @return plugin jar path.
+ */
+ private Optional<List<URL>> findPluginJarPath(PluginIdentifier
pluginIdentifier) {
+ Optional<String> pluginPrefix =
getPluginMappingPrefix(pluginIdentifier);
+ if (!pluginPrefix.isPresent()) {
return Optional.empty();
}
- String pluginJarPrefix =
optional.get().getValue().unwrapped().toString();
+ final String pluginName =
pluginIdentifier.getPluginName().toLowerCase();
+ final String pluginType =
pluginIdentifier.getPluginType().toLowerCase();
File[] targetPluginFiles =
pluginDir
.toFile()
.listFiles(
- pathname -> filterPluginJar(pathname,
pluginJarPrefix, pluginName));
+ pathname ->
+ filterPluginJar(pathname,
pluginPrefix.get(), pluginName));
if (ArrayUtils.isEmpty(targetPluginFiles)) {
return Optional.empty();
}
@@ -441,7 +474,8 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
pluginJarPaths =
Collections.singletonList(targetPluginFiles[0].toURI().toURL());
} else {
pluginJarPaths =
- selectPluginJar(targetPluginFiles, pluginJarPrefix,
pluginName, type).get();
+ selectPluginJar(targetPluginFiles, pluginPrefix.get(),
pluginName, type)
+ .get();
}
} catch (MalformedURLException e) {
throw new RuntimeException(e);
@@ -450,6 +484,34 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
return Optional.of(pluginJarPaths);
}
+ private List<URL> getPluginDependencyJarPaths(PluginIdentifier
pluginIdentifier)
+ throws IOException {
+ Optional<String> pluginPrefix =
getPluginMappingPrefix(pluginIdentifier);
+ if (!pluginPrefix.isPresent()) {
+ return Collections.emptyList();
+ }
+ List<URL> jars = new ArrayList<>();
+ Path pluginRootDir = Common.pluginRootDir();
+ if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir))
{
+ return new ArrayList<>();
+ }
+ for (File file : pluginRootDir.toFile().listFiles()) {
+ // only read current connector dependency and other common
dependency
+ if (file.isDirectory()
+ && (!file.getName().startsWith("connector-")
+ ||
file.getName().equalsIgnoreCase(pluginPrefix.get()))) {
+ jars.addAll(
+ FileUtils.searchJarFiles(
+ Paths.get(Common.pluginRootDir().toString(),
file.getName())));
+ } else if (!file.isDirectory()) {
+ jars.add(file.toURI().toURL());
+ }
+ }
+ return jars.stream()
+ .filter(path -> path.toString().endsWith(".jar"))
+ .collect(Collectors.toList());
+ }
+
private boolean filterPluginJar(File pathname, String pluginJarPrefix,
String pluginName) {
if (pluginName.contains("cdc")) {
return pathname.getName().endsWith(".jar")
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index 0d631745eb..36eb46aadb 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -44,6 +44,13 @@ public interface PluginDiscovery<T> {
*/
List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers);
+ /**
+ * Get all plugin dependency jar paths.
+ *
+ * @return plugin dependency jars.
+ */
+ List<URL> getPluginJarAndDependencyPaths(List<PluginIdentifier>
pluginIdentifiers);
+
/**
* Get plugin instance by plugin identifier.
*
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
index a2cc1f95d1..90b99e49f4 100644
---
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.junit.jupiter.api.AfterEach;
@@ -32,9 +33,11 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
+import java.net.MalformedURLException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -65,6 +68,28 @@ class SeaTunnelSourcePluginDiscoveryTest {
Lists.newArrayList(
Paths.get(seatunnelHome, "connectors",
"connector-http-jira.jar"),
Paths.get(seatunnelHome, "connectors",
"connector-http.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-clickhouse.jar"),
+ Paths.get(
+ seatunnelHome,
+ "plugins",
+ "connector-clickhouse",
+ "clickhouse-jdbc-driver.jar"),
+ Paths.get(
+ seatunnelHome,
+ "plugins",
+ "connector-clickhouse",
+ "clickhouse-jdbc-driver2.jar"),
+ Paths.get(seatunnelHome, "plugins", "connector-jdbc",
"mysql-jdbc-driver.jar"),
+ Paths.get(seatunnelHome, "plugins", "connector-jdbc",
"mysql-jdbc-driver2.jar"),
+ Paths.get(seatunnelHome, "plugins", "other",
"common-dependency.jar"),
+ Paths.get(seatunnelHome, "plugins", "other",
"common-dependency2.jar"),
+ Paths.get(seatunnelHome, "plugins",
"common-dependency3.jar"),
+ Paths.get(
+ seatunnelHome,
+ "plugins",
+ "otherWithLib",
+ "lib",
+ "common-dependency3.jar"),
Paths.get(seatunnelHome, "connectors",
"connector-kafka.jar"),
Paths.get(seatunnelHome, "connectors",
"connector-kafka-alcs.jar"),
Paths.get(seatunnelHome, "connectors",
"connector-kafka-blcs.jar"),
@@ -84,7 +109,7 @@ class SeaTunnelSourcePluginDiscoveryTest {
// The file is created under target directory.
for (Path pluginJar : pluginJars) {
- Files.createFile(pluginJar);
+ FileUtils.createNewFile(pluginJar.toString());
}
}
@@ -180,6 +205,116 @@ class SeaTunnelSourcePluginDiscoveryTest {
.collect(Collectors.toList()));
}
+ @Test
+ public void testGetPluginDependencies() throws MalformedURLException {
+ PluginIdentifier jdbc =
+ PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(),
"JDBC");
+ PluginIdentifier clickhouse =
+ PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(),
"ClickHouse");
+ SeaTunnelSourcePluginDiscovery discovery = new
SeaTunnelSourcePluginDiscovery();
+ List<String> jdbcAndClickHouseJars =
+
discovery.getPluginJarAndDependencyPaths(Lists.newArrayList(jdbc, clickhouse))
+ .stream()
+ .map(
+ url -> {
+ try {
+ return new File(url.toURI()).getPath();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(
+ Lists.newArrayList(
+ Paths.get(seatunnelHome,
"/connectors/connector-clickhouse.jar").toString(),
+ Paths.get(seatunnelHome,
"/connectors/connector-jdbc-release-1.1.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/common-dependency3.jar").toString(),
+ Paths.get(
+ seatunnelHome,
+
"/plugins/connector-clickhouse/clickhouse-jdbc-driver.jar")
+ .toString(),
+ Paths.get(
+ seatunnelHome,
+
"/plugins/connector-clickhouse/clickhouse-jdbc-driver2.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/connector-jdbc/mysql-jdbc-driver.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/connector-jdbc/mysql-jdbc-driver2.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/other/common-dependency.jar").toString(),
+ Paths.get(seatunnelHome,
"/plugins/other/common-dependency2.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/otherWithLib/lib/common-dependency3.jar")
+ .toString()),
+ jdbcAndClickHouseJars);
+ List<String> jdbcJars =
+
discovery.getPluginJarAndDependencyPaths(Lists.newArrayList(jdbc)).stream()
+ .map(
+ url -> {
+ try {
+ return new File(url.toURI()).getPath();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(
+ Lists.newArrayList(
+ Paths.get(seatunnelHome,
"/connectors/connector-jdbc-release-1.1.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/common-dependency3.jar").toString(),
+ Paths.get(seatunnelHome,
"/plugins/connector-jdbc/mysql-jdbc-driver.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/connector-jdbc/mysql-jdbc-driver2.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/other/common-dependency.jar").toString(),
+ Paths.get(seatunnelHome,
"/plugins/other/common-dependency2.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/otherWithLib/lib/common-dependency3.jar")
+ .toString()),
+ jdbcJars);
+ List<String> clickhouseJars =
+
discovery.getPluginJarAndDependencyPaths(Lists.newArrayList(clickhouse)).stream()
+ .map(
+ url -> {
+ try {
+ return new File(url.toURI()).getPath();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(
+ Lists.newArrayList(
+ Paths.get(seatunnelHome,
"/connectors/connector-clickhouse.jar").toString(),
+ Paths.get(seatunnelHome,
"/plugins/common-dependency3.jar").toString(),
+ Paths.get(
+ seatunnelHome,
+
"/plugins/connector-clickhouse/clickhouse-jdbc-driver.jar")
+ .toString(),
+ Paths.get(
+ seatunnelHome,
+
"/plugins/connector-clickhouse/clickhouse-jdbc-driver2.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/other/common-dependency.jar").toString(),
+ Paths.get(seatunnelHome,
"/plugins/other/common-dependency2.jar")
+ .toString(),
+ Paths.get(seatunnelHome,
"/plugins/otherWithLib/lib/common-dependency3.jar")
+ .toString()),
+ clickhouseJars);
+ }
+
+ @Test
+ public void testGetPluginsJarDependenciesWithoutConnectorDependency() {
+ List<Path> paths =
Common.getPluginsJarDependenciesWithoutConnectorDependency();
+ Assertions.assertIterableEquals(
+ Collections.singletonList(
+ Paths.get(
+ seatunnelHome,
"/plugins/otherWithLib/lib/common-dependency3.jar")),
+ paths);
+ }
+
@AfterEach
public void after() throws IOException {
for (Path pluginJar : pluginJars) {
diff --git
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
index f4c561c1bb..99959263a1 100644
---
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
+++
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
@@ -19,6 +19,8 @@ seatunnel.source.HttpBase = connector-http
seatunnel.sink.HttpBase = connector-http
seatunnel.source.HttpJira = connector-http-jira
seatunnel.sink.HttpJira = connector-http-jira
+seatunnel.source.Clickhouse = connector-clickhouse
+seatunnel.sink.Clickhouse = connector-clickhouse
seatunnel.source.Kafka = connector-kafka
seatunnel.sink.Kafka = connector-kafka
seatunnel.source.Kafka-Alcs = connector-kafka-alcs
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 842ee173dd..472928a3a0 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.Serializable;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -49,6 +50,19 @@ import java.util.stream.Collectors;
@Slf4j
public class CoordinatedSource<T, SplitT extends SourceSplit, StateT extends
Serializable>
implements BaseSourceFunction<T> {
+
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
protected static final long SLEEP_TIME_INTERVAL = 5L;
protected final SeaTunnelSource<T, SplitT, StateT> source;
protected final Map<Integer, List<byte[]>> restoredState;
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index ed794a5b6c..f3e768930c 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -44,6 +45,19 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
public class ParallelSource<T, SplitT extends SourceSplit, StateT extends
Serializable>
implements BaseSourceFunction<T> {
+
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private static final Logger LOG =
LoggerFactory.getLogger(ParallelSource.class);
protected final SeaTunnelSource<T, SplitT, StateT> source;
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 022ef3224c..f961b67b92 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
+import java.sql.DriverManager;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -46,6 +47,18 @@ import java.util.stream.Collectors;
public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT>
implements Sink<InputT, CommitWrapper<CommT>,
FlinkWriterState<WriterStateT>, GlobalCommT> {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT,
GlobalCommT> sink;
private final List<CatalogTable> catalogTables;
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
index 7868e6d3ef..58654e1dd1 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -37,6 +37,7 @@ import
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.Serializable;
+import java.sql.DriverManager;
/**
* The source implementation of {@link Source}, used for proxy all {@link
SeaTunnelSource} in flink.
@@ -48,6 +49,30 @@ public class FlinkSource<SplitT extends SourceSplit,
EnumStateT extends Serializ
implements Source<SeaTunnelRow, SplitWrapper<SplitT>, EnumStateT>,
ResultTypeQueryable<SeaTunnelRow> {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;
private final Config envConfig;
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
index f8ed25c044..28a8f1d952 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
@@ -30,9 +30,22 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import java.io.IOException;
+import java.sql.DriverManager;
public class SparkDataWriterFactory<CommitInfoT, StateT> implements
DataWriterFactory<InternalRow> {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
private final CatalogTable[] catalogTables;
private final String jobId;
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
index 59f4c9d0cb..67e8ca8df4 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
@@ -34,6 +34,7 @@ import
org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import java.io.IOException;
+import java.sql.DriverManager;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -43,6 +44,18 @@ import java.util.stream.Collectors;
public class SeaTunnelBatchWrite<StateT, CommitInfoT, AggregatedCommitInfoT>
implements BatchWrite, StreamingWrite {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink;
private final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>
aggregatedCommitter;
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
index 7d7537222e..cfd8605f74 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
@@ -31,10 +31,23 @@ import
org.apache.spark.sql.connector.write.DataWriterFactory;
import
org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
import java.io.IOException;
+import java.sql.DriverManager;
public class SeaTunnelSparkDataWriterFactory<CommitInfoT, StateT>
implements DataWriterFactory, StreamingDataWriterFactory {
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
private final CatalogTable[] catalogTables;
private final String jobId;