This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3dd7dc0e4 [Core][Plugin] Fix same plugin can't create twice error. 
(#2132)
3dd7dc0e4 is described below

commit 3dd7dc0e4cdf77bb5a039d3848365561cfac0f92
Author: Hisoka <[email protected]>
AuthorDate: Tue Jul 5 19:58:17 2022 +0800

    [Core][Plugin] Fix same plugin can't create twice error. (#2132)
    
    * [Core][Plugin] fix same plugin can't create twice error.
    
    * [Core][Plugin] change create plugin instance method name to 
createPluginInstance.
---
 .../core/flink/config/FlinkExecutionContext.java   |  6 +-
 .../core/spark/config/SparkExecutionContext.java   |  6 +-
 .../flink/execution/SinkExecuteProcessor.java      |  2 +-
 .../flink/execution/SourceExecuteProcessor.java    |  2 +-
 .../flink/execution/TransformExecuteProcessor.java |  2 +-
 .../spark/execution/SinkExecuteProcessor.java      |  2 +-
 .../spark/execution/SourceExecuteProcessor.java    |  2 +-
 .../spark/execution/TransformExecuteProcessor.java |  2 +-
 .../plugin/discovery/AbstractPluginDiscovery.java  | 71 +++++++++-------------
 .../plugin/discovery/PluginDiscovery.java          |  2 +-
 10 files changed, 43 insertions(+), 54 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
index 773feb870..78fa9b71d 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -63,7 +63,7 @@ public class FlinkExecutionContext extends 
AbstractExecutionContext<FlinkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseSource<FlinkEnvironment> pluginInstance = 
flinkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSource<FlinkEnvironment> pluginInstance = 
flinkSourcePluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
@@ -77,7 +77,7 @@ public class FlinkExecutionContext extends 
AbstractExecutionContext<FlinkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseTransform<FlinkEnvironment> pluginInstance = 
flinkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseTransform<FlinkEnvironment> pluginInstance = 
flinkTransformPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
@@ -91,7 +91,7 @@ public class FlinkExecutionContext extends 
AbstractExecutionContext<FlinkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseSink<FlinkEnvironment> pluginInstance = 
flinkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSink<FlinkEnvironment> pluginInstance = 
flinkSinkPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
diff --git 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
index 7effd19b3..80fd61adc 100644
--- 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
+++ 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -62,7 +62,7 @@ public class SparkExecutionContext extends 
AbstractExecutionContext<SparkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseSource<SparkEnvironment> pluginInstance = 
sparkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSource<SparkEnvironment> pluginInstance = 
sparkSourcePluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
@@ -76,7 +76,7 @@ public class SparkExecutionContext extends 
AbstractExecutionContext<SparkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseTransform<SparkEnvironment> pluginInstance = 
sparkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseTransform<SparkEnvironment> pluginInstance = 
sparkTransformPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
@@ -90,7 +90,7 @@ public class SparkExecutionContext extends 
AbstractExecutionContext<SparkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseSink<SparkEnvironment> pluginInstance = 
sparkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSink<SparkEnvironment> pluginInstance = 
sparkSinkPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 4744fdf7e..04eee536b 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -58,7 +58,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
             PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
sinkConfig.getString(PLUGIN_NAME));
             
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink =
-                sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
             seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
             return seaTunnelSink;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index b097c5a8c..6ea19fb9a 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -106,7 +106,7 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
                 ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
             
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSource seaTunnelSource = 
sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+            SeaTunnelSource seaTunnelSource = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
             seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
             if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index ab0200e37..c4c13b852 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -51,7 +51,7 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Fl
             .map(transformConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
transformConfig.getString(PLUGIN_NAME));
                 
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-                FlinkStreamTransform pluginInstance = (FlinkStreamTransform) 
transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                FlinkStreamTransform pluginInstance = (FlinkStreamTransform) 
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(transformConfig);
                 pluginInstance.prepare(flinkEnvironment);
                 return pluginInstance;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 2e0096a47..e932587ff 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -54,7 +54,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
         List<SeaTunnelSink<?, ?, ?, ?>> sinks = 
pluginConfigs.stream().map(sinkConfig -> {
             PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
sinkConfig.getString(PLUGIN_NAME));
             
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = 
sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+            SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
             seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
             return seaTunnelSink;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index ba953ebda..d436d74f0 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -70,7 +70,7 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
                 ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
             
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSource<?, ?, ?> seaTunnelSource = 
sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+            SeaTunnelSource<?, ?, ?> seaTunnelSource = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
             seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
             sources.add(seaTunnelSource);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index faa2bacea..945259340 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -51,7 +51,7 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Ba
             .map(transformConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
transformConfig.getString(PLUGIN_NAME));
                 
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-                BaseSparkTransform pluginInstance = 
transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSparkTransform pluginInstance = 
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(transformConfig);
                 pluginInstance.prepare(sparkEnvironment);
                 return pluginInstance;
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index edbd78812..ccd85bbf7 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -47,8 +47,6 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractPluginDiscovery.class);
     private final Path pluginDir;
 
-    protected final ConcurrentHashMap<PluginIdentifier, Optional<T>> 
pluginInstanceMap =
-        new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
     protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> 
pluginJarPath =
         new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
 
@@ -69,18 +67,41 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
     @Override
     public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
         return pluginIdentifiers.stream()
-            .map(this::getPluginInstance).distinct()
+            .map(this::createPluginInstance).distinct()
             .collect(Collectors.toList());
     }
 
     @Override
-    public T getPluginInstance(PluginIdentifier pluginIdentifier) {
-        Optional<T> pluginInstance = pluginInstanceMap
-            .computeIfAbsent(pluginIdentifier, this::createPluginInstance);
-        if (!pluginInstance.isPresent()) {
-            throw new IllegalArgumentException("Can't find plugin: " + 
pluginIdentifier);
+    public T createPluginInstance(PluginIdentifier pluginIdentifier) {
+        Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
+        ClassLoader classLoader;
+        // if the plugin jar not exist in plugin dir, will load from classpath.
+        if (pluginJarPath.isPresent()) {
+            LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier, 
pluginJarPath.get());
+            classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, 
Thread.currentThread().getContextClassLoader());
+        } else {
+            LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+            classLoader = Thread.currentThread().getContextClassLoader();
         }
-        return pluginInstance.get();
+        ServiceLoader<T> serviceLoader = 
ServiceLoader.load(getPluginBaseClass(), classLoader);
+        for (T t : serviceLoader) {
+            if (t instanceof Plugin) {
+                // old api
+                Plugin<?> pluginInstance = (Plugin<?>) t;
+                if 
(StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), 
pluginIdentifier.getPluginName())) {
+                    return (T) pluginInstance;
+                }
+            } else if (t instanceof PluginIdentifierInterface) {
+                // new api
+                PluginIdentifierInterface pluginIdentifierInstance = 
(PluginIdentifierInterface) t;
+                if 
(StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), 
pluginIdentifier.getPluginName())) {
+                    return (T) pluginIdentifierInstance;
+                }
+            } else {
+                throw new UnsupportedOperationException("Plugin instance: " + 
t + " is not supported.");
+            }
+        }
+        throw new RuntimeException("Plugin " + pluginIdentifier + " not 
found.");
     }
 
     /**
@@ -146,36 +167,4 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
             return Optional.empty();
         }
     }
-
-    private Optional<T> createPluginInstance(PluginIdentifier 
pluginIdentifier) {
-        Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
-        ClassLoader classLoader;
-        // if the plugin jar not exist in plugin dir, will load from classpath.
-        if (pluginJarPath.isPresent()) {
-            LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier, 
pluginJarPath.get());
-            classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, 
Thread.currentThread().getContextClassLoader());
-        } else {
-            LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
-            classLoader = Thread.currentThread().getContextClassLoader();
-        }
-        ServiceLoader<T> serviceLoader = 
ServiceLoader.load(getPluginBaseClass(), classLoader);
-        for (T t : serviceLoader) {
-            if (t instanceof Plugin) {
-                // old api
-                Plugin<?> pluginInstance = (Plugin<?>) t;
-                if 
(StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), 
pluginIdentifier.getPluginName())) {
-                    return Optional.of((T) pluginInstance);
-                }
-            } else if (t instanceof PluginIdentifierInterface) {
-                // new api
-                PluginIdentifierInterface pluginIdentifierInstance = 
(PluginIdentifierInterface) t;
-                if 
(StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), 
pluginIdentifier.getPluginName())) {
-                    return Optional.of((T) pluginIdentifierInstance);
-                }
-            } else {
-                throw new UnsupportedOperationException("Plugin instance: " + 
t + " is not supported.");
-            }
-        }
-        return Optional.empty();
-    }
 }
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index cdc85860d..8a571f92c 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -58,7 +58,7 @@ public interface PluginDiscovery<T> {
      * @param pluginIdentifier plugin identifier.
      * @return plugin instance. If not found, throw IllegalArgumentException.
      */
-    T getPluginInstance(PluginIdentifier pluginIdentifier);
+    T createPluginInstance(PluginIdentifier pluginIdentifier);
 
     /**
      * Get all plugin instances.

Reply via email to