This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2dd347673 [Core][Starter] Change jar connector load logic (#2193)
2dd347673 is described below

commit 2dd3476730f1e44251833a4b397b0eaccd979d43
Author: Hisoka <[email protected]>
AuthorDate: Tue Jul 19 23:15:01 2022 +0800

    [Core][Starter] Change jar connector load logic (#2193)
---
 .../seatunnel/common/utils/ReflectionUtils.java    | 13 +++--
 .../common/utils/ReflectionUtilsTest.java          | 25 ++++----
 .../apache/seatunnel/spark/jdbc/source/Jdbc.scala  |  4 +-
 .../util/{HiveDialet.scala => HiveDialect.scala}   |  2 +-
 .../execution/AbstractPluginExecuteProcessor.java  | 15 +++++
 .../flink/execution/SinkExecuteProcessor.java      |  2 +-
 .../flink/execution/SourceExecuteProcessor.java    |  2 +-
 .../plugin/discovery/AbstractPluginDiscovery.java  | 67 +++++++++++++++++-----
 ...very.java => FlinkAbstractPluginDiscovery.java} | 31 +++++-----
 .../discovery/flink/FlinkSinkPluginDiscovery.java  |  3 +-
 .../flink/FlinkSourcePluginDiscovery.java          |  3 +-
 .../flink/FlinkTransformPluginDiscovery.java       |  3 +-
 .../seatunnel/SeaTunnelSinkPluginDiscovery.java    |  7 +++
 .../seatunnel/SeaTunnelSourcePluginDiscovery.java  |  8 +++
 14 files changed, 130 insertions(+), 55 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
index 3edd9cccf..f186f0486 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
@@ -28,7 +28,7 @@ public class ReflectionUtils {
 
         Optional<Method> method = Optional.empty();
         Method m;
-        for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
+        for (; clazz != null; clazz = clazz.getSuperclass()) {
             try {
                 m = clazz.getDeclaredMethod(methodName, parameterTypes);
                 m.setAccessible(true);
@@ -80,9 +80,14 @@ public class ReflectionUtils {
     public static Object invoke(
             Object object, String methodName, Class<?>[] argTypes, Object[] 
args) {
         try {
-            Method method = object.getClass().getDeclaredMethod(methodName, 
argTypes);
-            method.setAccessible(true);
-            return method.invoke(object, args);
+            Optional<Method> method = getDeclaredMethod(object.getClass(), 
methodName, argTypes);
+            if (method.isPresent()) {
+                method.get().setAccessible(true);
+                return method.get().invoke(object, args);
+            } else {
+                throw new NoSuchMethodException(String.format("method invoke 
failed, no such method '%s' in '%s'",
+                        methodName, object.getClass()));
+            }
         } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
             throw new RuntimeException("method invoke failed", e);
         }
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ReflectionUtilsTest.java
similarity index 55%
copy from 
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
copy to 
seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ReflectionUtilsTest.java
index a2185a4ae..422c70eef 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
+++ 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ReflectionUtilsTest.java
@@ -15,19 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.plugin.discovery.flink;
+package org.apache.seatunnel.common.utils;
 
-import org.apache.seatunnel.flink.BaseFlinkSink;
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class FlinkSinkPluginDiscovery extends 
AbstractPluginDiscovery<BaseFlinkSink> {
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
 
-    public FlinkSinkPluginDiscovery() {
-        super("flink");
-    }
+public class ReflectionUtilsTest {
+
+    @Test
+    public void testInvoke() throws MalformedURLException {
+        ReflectionUtils.invoke(new String[]{}, "toString");
 
-    @Override
-    protected Class<BaseFlinkSink> getPluginBaseClass() {
-        return BaseFlinkSink.class;
+        URLClassLoader classLoader = new URLClassLoader(new URL[]{}, 
Thread.currentThread().getContextClassLoader());
+        ReflectionUtils.invoke(classLoader, "addURL", new URL("file:///test"));
+        Assert.assertArrayEquals(classLoader.getURLs(), new URL[]{new 
URL("file:///test")});
     }
+
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
index df3d8c983..6a31beefd 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
@@ -22,7 +22,7 @@ import org.apache.seatunnel.common.config.{CheckResult, 
TypesafeConfigUtils}
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
-import org.apache.seatunnel.spark.jdbc.source.util.HiveDialet
+import org.apache.seatunnel.spark.jdbc.source.util.HiveDialect
 import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.{DataFrameReader, Dataset, Row, SparkSession}
 
@@ -60,7 +60,7 @@ class Jdbc extends SparkBatchSource {
     }
 
     if (config.getString("url").startsWith("jdbc:hive2")) {
-      JdbcDialects.registerDialect(new HiveDialet)
+      JdbcDialects.registerDialect(new HiveDialect)
     }
 
     reader
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialect.scala
similarity index 96%
rename from 
seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala
rename to 
seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialect.scala
index c686cdfba..81eb48bf6 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialect.scala
@@ -18,7 +18,7 @@ package org.apache.seatunnel.spark.jdbc.source.util
 
 import org.apache.spark.sql.jdbc.JdbcDialect
 
-class HiveDialet extends JdbcDialect {
+class HiveDialect extends JdbcDialect {
   override def canHandle(url: String): Boolean = {
     url.startsWith("jdbc:hive2")
   }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 6c3ebceb1..5e3178bfb 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
 
+import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.util.TableUtil;
 
@@ -30,8 +31,11 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.BiConsumer;
 
 public abstract class AbstractPluginExecuteProcessor<T> implements 
PluginExecuteProcessor {
 
@@ -41,6 +45,17 @@ public abstract class AbstractPluginExecuteProcessor<T> 
implements PluginExecute
     protected static final String ENGINE_TYPE = "seatunnel";
     protected static final String PLUGIN_NAME = "plugin_name";
 
+    protected final BiConsumer<ClassLoader, URL> addUrlToClassloader = 
(classLoader, url) -> {
+        if 
(classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
+            URLClassLoader c = (URLClassLoader) 
ReflectionUtils.getField(classLoader, "inner").get();
+            ReflectionUtils.invoke(c, "addURL", url);
+        } else if (classLoader instanceof URLClassLoader) {
+            ReflectionUtils.invoke(classLoader, "addURL", url);
+        } else {
+            throw new RuntimeException("Unsupported classloader: " + 
classLoader.getClass().getName());
+        }
+    };
+
     protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                              List<? extends Config> 
pluginConfigs) {
         this.flinkEnvironment = flinkEnvironment;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index ef412213e..8de5d422d 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -52,7 +52,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
 
     @Override
     protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
-        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery(addUrlToClassloader);
         List<URL> pluginJars = new ArrayList<>();
         List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
             PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
sinkConfig.getString(PLUGIN_NAME));
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index fa0cb65eb..a1b31836f 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -101,7 +101,7 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
 
     @Override
     protected List<SeaTunnelSource> initializePlugins(List<? extends Config> 
pluginConfigs) {
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new 
SeaTunnelSourcePluginDiscovery();
+        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new 
SeaTunnelSourcePluginDiscovery(addUrlToClassloader);
         List<SeaTunnelSource> sources = new ArrayList<>();
         Set<URL> jars = new HashSet<>();
         for (Config sourceConfig : pluginConfigs) {
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index ccd85bbf7..529049455 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.plugin.discovery;
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
@@ -29,6 +30,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileFilter;
 import java.net.MalformedURLException;
@@ -40,6 +43,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> 
{
@@ -47,8 +51,26 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractPluginDiscovery.class);
     private final Path pluginDir;
 
+    /**
+     * Add jar url to classloader. The different engine should have different 
logic to add url into
+     * their own classloader
+     */
+    private BiConsumer<ClassLoader, URL> addURLToClassLoader = (classLoader, 
url) -> {
+        if (classLoader instanceof URLClassLoader) {
+            ReflectionUtils.invoke(classLoader, "addURL", url);
+        } else {
+            throw new UnsupportedOperationException("can't support custom load 
jar");
+        }
+    };
+
     protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> 
pluginJarPath =
-        new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+            new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+
+    public AbstractPluginDiscovery(String pluginSubDir, 
BiConsumer<ClassLoader, URL> addURLToClassloader) {
+        this.pluginDir = Common.connectorJarDir(pluginSubDir);
+        this.addURLToClassLoader = addURLToClassloader;
+        LOGGER.info("Load {} Plugin from {}", 
getPluginBaseClass().getSimpleName(), pluginDir);
+    }
 
     public AbstractPluginDiscovery(String pluginSubDir) {
         this.pluginDir = Common.connectorJarDir(pluginSubDir);
@@ -58,10 +80,10 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
     @Override
     public List<URL> getPluginJarPaths(List<PluginIdentifier> 
pluginIdentifiers) {
         return pluginIdentifiers.stream()
-            .map(this::getPluginJarPath)
-            .filter(Optional::isPresent)
-            .map(Optional::get).distinct()
-            .collect(Collectors.toList());
+                .map(this::getPluginJarPath)
+                .filter(Optional::isPresent)
+                .map(Optional::get).distinct()
+                .collect(Collectors.toList());
     }
 
     @Override
@@ -73,16 +95,35 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
 
     @Override
     public T createPluginInstance(PluginIdentifier pluginIdentifier) {
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        T pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
+        if (pluginInstance != null) {
+            LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+            return pluginInstance;
+        }
         Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
-        ClassLoader classLoader;
-        // if the plugin jar not exist in plugin dir, will load from classpath.
+        // if the plugin jar not exist in classpath, will load from plugin dir.
         if (pluginJarPath.isPresent()) {
-            LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier, 
pluginJarPath.get());
-            classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, 
Thread.currentThread().getContextClassLoader());
-        } else {
-            LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
-            classLoader = Thread.currentThread().getContextClassLoader();
+            try {
+                // use current thread classloader to avoid different 
classloader load same class error.
+                this.addURLToClassLoader.accept(classLoader, 
pluginJarPath.get());
+            } catch (Exception e) {
+                LOGGER.warn("can't load jar use current thread classloader, 
use URLClassLoader instead now." +
+                        " message: " + e.getMessage());
+                classLoader = new URLClassLoader(new 
URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
+            }
+            pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
+            if (pluginInstance != null) {
+                LOGGER.info("Load plugin: {} from path: {} use classloader: 
{}",
+                        pluginIdentifier, pluginJarPath.get(), 
classLoader.getClass().getName());
+                return pluginInstance;
+            }
         }
+        throw new RuntimeException("Plugin " + pluginIdentifier + " not 
found.");
+    }
+
+    @Nullable
+    private T loadPluginInstance(PluginIdentifier pluginIdentifier, 
ClassLoader classLoader) {
         ServiceLoader<T> serviceLoader = 
ServiceLoader.load(getPluginBaseClass(), classLoader);
         for (T t : serviceLoader) {
             if (t instanceof Plugin) {
@@ -101,7 +142,7 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
                 throw new UnsupportedOperationException("Plugin instance: " + 
t + " is not supported.");
             }
         }
-        throw new RuntimeException("Plugin " + pluginIdentifier + " not 
found.");
+        return null;
     }
 
     /**
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkAbstractPluginDiscovery.java
similarity index 52%
copy from 
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
copy to 
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkAbstractPluginDiscovery.java
index cc77b49a0..a9956fd3f 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkAbstractPluginDiscovery.java
@@ -17,27 +17,24 @@
 
 package org.apache.seatunnel.plugin.discovery.flink;
 
-import org.apache.seatunnel.flink.BaseFlinkTransform;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
+import java.net.URLClassLoader;
 
-public class FlinkTransformPluginDiscovery extends 
AbstractPluginDiscovery<BaseFlinkTransform> {
+public abstract class FlinkAbstractPluginDiscovery<T> extends 
AbstractPluginDiscovery<T> {
 
-    public FlinkTransformPluginDiscovery() {
-        super("flink");
+    public FlinkAbstractPluginDiscovery(String pluginSubDir) {
+        super(pluginSubDir, (classLoader, url) -> {
+            if 
(classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
+                URLClassLoader c = (URLClassLoader) 
ReflectionUtils.getField(classLoader, "inner").get();
+                ReflectionUtils.invoke(c, "addURL", url);
+            } else if (classLoader instanceof URLClassLoader) {
+                ReflectionUtils.invoke(classLoader, "addURL", url);
+            } else {
+                throw new RuntimeException("Unsupported classloader: " + 
classLoader.getClass().getName());
+            }
+        });
     }
 
-    @Override
-    public List<URL> getPluginJarPaths(List<PluginIdentifier> 
pluginIdentifiers) {
-        return new ArrayList<>();
-    }
-
-    @Override
-    protected Class<BaseFlinkTransform> getPluginBaseClass() {
-        return BaseFlinkTransform.class;
-    }
 }
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
index a2185a4ae..8c973cd0e 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
@@ -18,9 +18,8 @@
 package org.apache.seatunnel.plugin.discovery.flink;
 
 import org.apache.seatunnel.flink.BaseFlinkSink;
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
 
-public class FlinkSinkPluginDiscovery extends 
AbstractPluginDiscovery<BaseFlinkSink> {
+public class FlinkSinkPluginDiscovery extends 
FlinkAbstractPluginDiscovery<BaseFlinkSink> {
 
     public FlinkSinkPluginDiscovery() {
         super("flink");
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
index 24ff89e03..fd9e41564 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
@@ -18,9 +18,8 @@
 package org.apache.seatunnel.plugin.discovery.flink;
 
 import org.apache.seatunnel.flink.BaseFlinkSource;
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
 
-public class FlinkSourcePluginDiscovery extends 
AbstractPluginDiscovery<BaseFlinkSource> {
+public class FlinkSourcePluginDiscovery extends 
FlinkAbstractPluginDiscovery<BaseFlinkSource> {
     public FlinkSourcePluginDiscovery() {
         super("flink");
     }
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
index cc77b49a0..12a91d088 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
@@ -18,14 +18,13 @@
 package org.apache.seatunnel.plugin.discovery.flink;
 
 import org.apache.seatunnel.flink.BaseFlinkTransform;
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
-public class FlinkTransformPluginDiscovery extends 
AbstractPluginDiscovery<BaseFlinkTransform> {
+public class FlinkTransformPluginDiscovery extends 
FlinkAbstractPluginDiscovery<BaseFlinkTransform> {
 
     public FlinkTransformPluginDiscovery() {
         super("flink");
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
index d3286c544..e2ca9427e 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
@@ -20,12 +20,19 @@ package org.apache.seatunnel.plugin.discovery.seatunnel;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
 
+import java.net.URL;
+import java.util.function.BiConsumer;
+
 public class SeaTunnelSinkPluginDiscovery extends 
AbstractPluginDiscovery<SeaTunnelSink> {
 
     public SeaTunnelSinkPluginDiscovery() {
         super("seatunnel");
     }
 
+    public SeaTunnelSinkPluginDiscovery(BiConsumer<ClassLoader, URL> 
addURLToClassLoader) {
+        super("seatunnel", addURLToClassLoader);
+    }
+
     @Override
     protected Class<SeaTunnelSink> getPluginBaseClass() {
         return SeaTunnelSink.class;
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
index 8618e0378..f9da2a0a9 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
@@ -20,11 +20,19 @@ package org.apache.seatunnel.plugin.discovery.seatunnel;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
 
+import java.net.URL;
+import java.util.function.BiConsumer;
+
 public class SeaTunnelSourcePluginDiscovery extends 
AbstractPluginDiscovery<SeaTunnelSource> {
+
     public SeaTunnelSourcePluginDiscovery() {
         super("seatunnel");
     }
 
+    public SeaTunnelSourcePluginDiscovery(BiConsumer<ClassLoader, URL> 
addURLToClassLoader) {
+        super("seatunnel", addURLToClassLoader);
+    }
+
     @Override
     protected Class<SeaTunnelSource> getPluginBaseClass() {
         return SeaTunnelSource.class;

Reply via email to