This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e93fdf74b fix same source and sink registerplugin librarycache errpr
(#2015)
e93fdf74b is described below
commit e93fdf74bdeaad3f80d472f315f5917c7754cbf4
Author: 沫 <[email protected]>
AuthorDate: Wed Jun 15 14:13:41 2022 +0800
fix same source and sink registerplugin librarycache errpr (#2015)
Co-authored-by: zhoutao.tobeone <[email protected]>
---
.../org/apache/seatunnel/apis/base/env/RuntimeEnv.java | 4 ++--
.../java/org/apache/seatunnel/flink/FlinkEnvironment.java | 3 ++-
.../java/org/apache/seatunnel/spark/SparkEnvironment.java | 4 ++--
.../apache/seatunnel/core/base/config/PluginFactory.java | 14 ++++++++------
4 files changed, 14 insertions(+), 11 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java
index e8532a208..f0cf54447 100644
---
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java
+++
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.net.URL;
-import java.util.List;
+import java.util.Set;
/**
* engine related runtime environment
@@ -42,6 +42,6 @@ public interface RuntimeEnv {
JobMode getJobMode();
- void registerPlugin(List<URL> pluginPaths);
+ void registerPlugin(Set<URL> pluginPaths);
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 5df4a8be0..2566cc5d2 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -49,6 +49,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
public class FlinkEnvironment implements RuntimeEnv {
@@ -120,7 +121,7 @@ public class FlinkEnvironment implements RuntimeEnv {
}
@Override
- public void registerPlugin(List<URL> pluginPaths) {
+ public void registerPlugin(Set<URL> pluginPaths) {
LOGGER.info("register plugins :" + pluginPaths);
Configuration configuration;
try {
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index 3e2d5e91b..bb0bdd9b0 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -38,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
-import java.util.List;
+import java.util.Set;
public class SparkEnvironment implements RuntimeEnv {
@@ -89,7 +89,7 @@ public class SparkEnvironment implements RuntimeEnv {
}
@Override
- public void registerPlugin(List<URL> pluginPaths) {
+ public void registerPlugin(Set<URL> pluginPaths) {
LOGGER.info("register plugins :" + pluginPaths);
// TODO we use --jar parameter to support submit multi-jar in spark
cluster at now. Refactor it to
// support submit multi-jar in code or remove this logic.
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
index 24d4f9630..0dbdb0ef1 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
@@ -44,6 +44,7 @@ import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -51,6 +52,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -68,7 +70,7 @@ public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
private static final String PLUGIN_NAME_KEY = "plugin_name";
private static final String PLUGIN_MAPPING_FILE =
"plugin-mapping.properties";
- private final List<URL> pluginJarPaths;
+ private final Set<URL> pluginJarPaths;
private final ClassLoader defaultClassLoader;
static {
@@ -93,17 +95,17 @@ public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
this.defaultClassLoader =
initClassLoaderWithPaths(this.pluginJarPaths);
}
- private ClassLoader initClassLoaderWithPaths(List<URL> pluginJarPaths) {
+ private ClassLoader initClassLoaderWithPaths(Set<URL> pluginJarPaths) {
return new URLClassLoader(pluginJarPaths.toArray(new URL[0]),
Thread.currentThread().getContextClassLoader());
}
@Nonnull
- private List<URL> searchPluginJar() {
+ private Set<URL> searchPluginJar() {
File pluginDir =
Common.connectorJarDir(this.engineType.getEngine()).toFile();
if (!pluginDir.exists() || pluginDir.listFiles() == null) {
- return new ArrayList<>();
+ return new HashSet<>();
}
Config pluginMapping = ConfigFactory
.parseFile(new File(getPluginMappingPath()))
@@ -140,10 +142,10 @@ public class PluginFactory<ENVIRONMENT extends
RuntimeEnv> {
});
return pluginList.stream();
- }).collect(Collectors.toList());
+ }).collect(Collectors.toSet());
}
- public List<URL> getPluginJarPaths() {
+ public Set<URL> getPluginJarPaths() {
return this.pluginJarPaths;
}