This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2dd347673 [Core][Starter] Change jar connector load logic (#2193)
2dd347673 is described below
commit 2dd3476730f1e44251833a4b397b0eaccd979d43
Author: Hisoka <[email protected]>
AuthorDate: Tue Jul 19 23:15:01 2022 +0800
[Core][Starter] Change jar connector load logic (#2193)
---
.../seatunnel/common/utils/ReflectionUtils.java | 13 +++--
.../common/utils/ReflectionUtilsTest.java | 25 ++++----
.../apache/seatunnel/spark/jdbc/source/Jdbc.scala | 4 +-
.../util/{HiveDialet.scala => HiveDialect.scala} | 2 +-
.../execution/AbstractPluginExecuteProcessor.java | 15 +++++
.../flink/execution/SinkExecuteProcessor.java | 2 +-
.../flink/execution/SourceExecuteProcessor.java | 2 +-
.../plugin/discovery/AbstractPluginDiscovery.java | 67 +++++++++++++++++-----
...very.java => FlinkAbstractPluginDiscovery.java} | 31 +++++-----
.../discovery/flink/FlinkSinkPluginDiscovery.java | 3 +-
.../flink/FlinkSourcePluginDiscovery.java | 3 +-
.../flink/FlinkTransformPluginDiscovery.java | 3 +-
.../seatunnel/SeaTunnelSinkPluginDiscovery.java | 7 +++
.../seatunnel/SeaTunnelSourcePluginDiscovery.java | 8 +++
14 files changed, 130 insertions(+), 55 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
index 3edd9cccf..f186f0486 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
@@ -28,7 +28,7 @@ public class ReflectionUtils {
Optional<Method> method = Optional.empty();
Method m;
- for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
+ for (; clazz != null; clazz = clazz.getSuperclass()) {
try {
m = clazz.getDeclaredMethod(methodName, parameterTypes);
m.setAccessible(true);
@@ -80,9 +80,14 @@ public class ReflectionUtils {
public static Object invoke(
Object object, String methodName, Class<?>[] argTypes, Object[]
args) {
try {
- Method method = object.getClass().getDeclaredMethod(methodName,
argTypes);
- method.setAccessible(true);
- return method.invoke(object, args);
+ Optional<Method> method = getDeclaredMethod(object.getClass(),
methodName, argTypes);
+ if (method.isPresent()) {
+ method.get().setAccessible(true);
+ return method.get().invoke(object, args);
+ } else {
+ throw new NoSuchMethodException(String.format("method invoke
failed, no such method '%s' in '%s'",
+ methodName, object.getClass()));
+ }
} catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
throw new RuntimeException("method invoke failed", e);
}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ReflectionUtilsTest.java
similarity index 55%
copy from
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
copy to
seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ReflectionUtilsTest.java
index a2185a4ae..422c70eef 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ReflectionUtilsTest.java
@@ -15,19 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.plugin.discovery.flink;
+package org.apache.seatunnel.common.utils;
-import org.apache.seatunnel.flink.BaseFlinkSink;
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.junit.Assert;
+import org.junit.Test;
-public class FlinkSinkPluginDiscovery extends
AbstractPluginDiscovery<BaseFlinkSink> {
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
- public FlinkSinkPluginDiscovery() {
- super("flink");
- }
+public class ReflectionUtilsTest {
+
+ @Test
+ public void testInvoke() throws MalformedURLException {
+ ReflectionUtils.invoke(new String[]{}, "toString");
- @Override
- protected Class<BaseFlinkSink> getPluginBaseClass() {
- return BaseFlinkSink.class;
+ URLClassLoader classLoader = new URLClassLoader(new URL[]{},
Thread.currentThread().getContextClassLoader());
+ ReflectionUtils.invoke(classLoader, "addURL", new URL("file:///test"));
+ Assert.assertArrayEquals(classLoader.getURLs(), new URL[]{new
URL("file:///test")});
}
+
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
index df3d8c983..6a31beefd 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
@@ -22,7 +22,7 @@ import org.apache.seatunnel.common.config.{CheckResult,
TypesafeConfigUtils}
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
-import org.apache.seatunnel.spark.jdbc.source.util.HiveDialet
+import org.apache.seatunnel.spark.jdbc.source.util.HiveDialect
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.{DataFrameReader, Dataset, Row, SparkSession}
@@ -60,7 +60,7 @@ class Jdbc extends SparkBatchSource {
}
if (config.getString("url").startsWith("jdbc:hive2")) {
- JdbcDialects.registerDialect(new HiveDialet)
+ JdbcDialects.registerDialect(new HiveDialect)
}
reader
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialect.scala
similarity index 96%
rename from
seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala
rename to
seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialect.scala
index c686cdfba..81eb48bf6 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialect.scala
@@ -18,7 +18,7 @@ package org.apache.seatunnel.spark.jdbc.source.util
import org.apache.spark.sql.jdbc.JdbcDialect
-class HiveDialet extends JdbcDialect {
+class HiveDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:hive2")
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 6c3ebceb1..5e3178bfb 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.util.TableUtil;
@@ -30,8 +31,11 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.util.List;
import java.util.Optional;
+import java.util.function.BiConsumer;
public abstract class AbstractPluginExecuteProcessor<T> implements
PluginExecuteProcessor {
@@ -41,6 +45,17 @@ public abstract class AbstractPluginExecuteProcessor<T>
implements PluginExecute
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME = "plugin_name";
+ protected final BiConsumer<ClassLoader, URL> addUrlToClassloader =
(classLoader, url) -> {
+ if
(classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
+ URLClassLoader c = (URLClassLoader)
ReflectionUtils.getField(classLoader, "inner").get();
+ ReflectionUtils.invoke(c, "addURL", url);
+ } else if (classLoader instanceof URLClassLoader) {
+ ReflectionUtils.invoke(classLoader, "addURL", url);
+ } else {
+ throw new RuntimeException("Unsupported classloader: " +
classLoader.getClass().getName());
+ }
+ };
+
protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config>
pluginConfigs) {
this.flinkEnvironment = flinkEnvironment;
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index ef412213e..8de5d422d 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -52,7 +52,7 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
@Override
protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery(addUrlToClassloader);
List<URL> pluginJars = new ArrayList<>();
List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE,
sinkConfig.getString(PLUGIN_NAME));
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index fa0cb65eb..a1b31836f 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -101,7 +101,7 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
@Override
protected List<SeaTunnelSource> initializePlugins(List<? extends Config>
pluginConfigs) {
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new
SeaTunnelSourcePluginDiscovery();
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new
SeaTunnelSourcePluginDiscovery(addUrlToClassloader);
List<SeaTunnelSource> sources = new ArrayList<>();
Set<URL> jars = new HashSet<>();
for (Config sourceConfig : pluginConfigs) {
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index ccd85bbf7..529049455 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.plugin.discovery;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
@@ -29,6 +30,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.FileFilter;
import java.net.MalformedURLException;
@@ -40,6 +43,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T>
{
@@ -47,8 +51,26 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractPluginDiscovery.class);
private final Path pluginDir;
+ /**
+ * Add jar url to classloader. The different engine should have different
logic to add url into
+ * their own classloader
+ */
+ private BiConsumer<ClassLoader, URL> addURLToClassLoader = (classLoader,
url) -> {
+ if (classLoader instanceof URLClassLoader) {
+ ReflectionUtils.invoke(classLoader, "addURL", url);
+ } else {
+ throw new UnsupportedOperationException("can't support custom load
jar");
+ }
+ };
+
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>>
pluginJarPath =
- new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+ new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+
+ public AbstractPluginDiscovery(String pluginSubDir,
BiConsumer<ClassLoader, URL> addURLToClassloader) {
+ this.pluginDir = Common.connectorJarDir(pluginSubDir);
+ this.addURLToClassLoader = addURLToClassloader;
+ LOGGER.info("Load {} Plugin from {}",
getPluginBaseClass().getSimpleName(), pluginDir);
+ }
public AbstractPluginDiscovery(String pluginSubDir) {
this.pluginDir = Common.connectorJarDir(pluginSubDir);
@@ -58,10 +80,10 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
@Override
public List<URL> getPluginJarPaths(List<PluginIdentifier>
pluginIdentifiers) {
return pluginIdentifiers.stream()
- .map(this::getPluginJarPath)
- .filter(Optional::isPresent)
- .map(Optional::get).distinct()
- .collect(Collectors.toList());
+ .map(this::getPluginJarPath)
+ .filter(Optional::isPresent)
+ .map(Optional::get).distinct()
+ .collect(Collectors.toList());
}
@Override
@@ -73,16 +95,35 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
@Override
public T createPluginInstance(PluginIdentifier pluginIdentifier) {
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ T pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
+ if (pluginInstance != null) {
+ LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+ return pluginInstance;
+ }
Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
- ClassLoader classLoader;
- // if the plugin jar not exist in plugin dir, will load from classpath.
+ // if the plugin jar not exist in classpath, will load from plugin dir.
if (pluginJarPath.isPresent()) {
- LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier,
pluginJarPath.get());
- classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()},
Thread.currentThread().getContextClassLoader());
- } else {
- LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
- classLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ // use current thread classloader to avoid different
classloader load same class error.
+ this.addURLToClassLoader.accept(classLoader,
pluginJarPath.get());
+ } catch (Exception e) {
+ LOGGER.warn("can't load jar use current thread classloader,
use URLClassLoader instead now." +
+ " message: " + e.getMessage());
+ classLoader = new URLClassLoader(new
URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
+ }
+ pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
+ if (pluginInstance != null) {
+ LOGGER.info("Load plugin: {} from path: {} use classloader:
{}",
+ pluginIdentifier, pluginJarPath.get(),
classLoader.getClass().getName());
+ return pluginInstance;
+ }
}
+ throw new RuntimeException("Plugin " + pluginIdentifier + " not
found.");
+ }
+
+ @Nullable
+ private T loadPluginInstance(PluginIdentifier pluginIdentifier,
ClassLoader classLoader) {
ServiceLoader<T> serviceLoader =
ServiceLoader.load(getPluginBaseClass(), classLoader);
for (T t : serviceLoader) {
if (t instanceof Plugin) {
@@ -101,7 +142,7 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
throw new UnsupportedOperationException("Plugin instance: " +
t + " is not supported.");
}
}
- throw new RuntimeException("Plugin " + pluginIdentifier + " not
found.");
+ return null;
}
/**
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkAbstractPluginDiscovery.java
similarity index 52%
copy from
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
copy to
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkAbstractPluginDiscovery.java
index cc77b49a0..a9956fd3f 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkAbstractPluginDiscovery.java
@@ -17,27 +17,24 @@
package org.apache.seatunnel.plugin.discovery.flink;
-import org.apache.seatunnel.flink.BaseFlinkTransform;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
+import java.net.URLClassLoader;
-public class FlinkTransformPluginDiscovery extends
AbstractPluginDiscovery<BaseFlinkTransform> {
+public abstract class FlinkAbstractPluginDiscovery<T> extends
AbstractPluginDiscovery<T> {
- public FlinkTransformPluginDiscovery() {
- super("flink");
+ public FlinkAbstractPluginDiscovery(String pluginSubDir) {
+ super(pluginSubDir, (classLoader, url) -> {
+ if
(classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
+ URLClassLoader c = (URLClassLoader)
ReflectionUtils.getField(classLoader, "inner").get();
+ ReflectionUtils.invoke(c, "addURL", url);
+ } else if (classLoader instanceof URLClassLoader) {
+ ReflectionUtils.invoke(classLoader, "addURL", url);
+ } else {
+ throw new RuntimeException("Unsupported classloader: " +
classLoader.getClass().getName());
+ }
+ });
}
- @Override
- public List<URL> getPluginJarPaths(List<PluginIdentifier>
pluginIdentifiers) {
- return new ArrayList<>();
- }
-
- @Override
- protected Class<BaseFlinkTransform> getPluginBaseClass() {
- return BaseFlinkTransform.class;
- }
}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
index a2185a4ae..8c973cd0e 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
@@ -18,9 +18,8 @@
package org.apache.seatunnel.plugin.discovery.flink;
import org.apache.seatunnel.flink.BaseFlinkSink;
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-public class FlinkSinkPluginDiscovery extends
AbstractPluginDiscovery<BaseFlinkSink> {
+public class FlinkSinkPluginDiscovery extends
FlinkAbstractPluginDiscovery<BaseFlinkSink> {
public FlinkSinkPluginDiscovery() {
super("flink");
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
index 24ff89e03..fd9e41564 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
@@ -18,9 +18,8 @@
package org.apache.seatunnel.plugin.discovery.flink;
import org.apache.seatunnel.flink.BaseFlinkSource;
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-public class FlinkSourcePluginDiscovery extends
AbstractPluginDiscovery<BaseFlinkSource> {
+public class FlinkSourcePluginDiscovery extends
FlinkAbstractPluginDiscovery<BaseFlinkSource> {
public FlinkSourcePluginDiscovery() {
super("flink");
}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
index cc77b49a0..12a91d088 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
@@ -18,14 +18,13 @@
package org.apache.seatunnel.plugin.discovery.flink;
import org.apache.seatunnel.flink.BaseFlinkTransform;
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-public class FlinkTransformPluginDiscovery extends
AbstractPluginDiscovery<BaseFlinkTransform> {
+public class FlinkTransformPluginDiscovery extends
FlinkAbstractPluginDiscovery<BaseFlinkTransform> {
public FlinkTransformPluginDiscovery() {
super("flink");
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
index d3286c544..e2ca9427e 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
@@ -20,12 +20,19 @@ package org.apache.seatunnel.plugin.discovery.seatunnel;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import java.net.URL;
+import java.util.function.BiConsumer;
+
public class SeaTunnelSinkPluginDiscovery extends
AbstractPluginDiscovery<SeaTunnelSink> {
public SeaTunnelSinkPluginDiscovery() {
super("seatunnel");
}
+ public SeaTunnelSinkPluginDiscovery(BiConsumer<ClassLoader, URL>
addURLToClassLoader) {
+ super("seatunnel", addURLToClassLoader);
+ }
+
@Override
protected Class<SeaTunnelSink> getPluginBaseClass() {
return SeaTunnelSink.class;
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
index 8618e0378..f9da2a0a9 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
@@ -20,11 +20,19 @@ package org.apache.seatunnel.plugin.discovery.seatunnel;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import java.net.URL;
+import java.util.function.BiConsumer;
+
public class SeaTunnelSourcePluginDiscovery extends
AbstractPluginDiscovery<SeaTunnelSource> {
+
public SeaTunnelSourcePluginDiscovery() {
super("seatunnel");
}
+ public SeaTunnelSourcePluginDiscovery(BiConsumer<ClassLoader, URL>
addURLToClassLoader) {
+ super("seatunnel", addURLToClassLoader);
+ }
+
@Override
protected Class<SeaTunnelSource> getPluginBaseClass() {
return SeaTunnelSource.class;