This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 5f79f45d Add plugin discovery module (#1881)
5f79f45d is described below
commit 5f79f45d22bbedc4a0f95ca1f251a217a7b4a23a
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun May 15 19:56:03 2022 +0800
Add plugin discovery module (#1881)
* Add plugin discovery module
* Remove PluginFactory
---
pom.xml | 1 +
.../org/apache/seatunnel/common/config/Common.java | 7 +-
seatunnel-core/seatunnel-core-base/pom.xml | 6 +
...nContext.java => AbstractExecutionContext.java} | 45 ++--
.../seatunnel/core/base/config/ConfigBuilder.java | 1 -
.../core/base/config/ExecutionFactory.java | 4 +-
.../seatunnel/core/base/config/PluginFactory.java | 241 ---------------------
.../flink/command/FlinkApiTaskExecuteCommand.java | 4 +-
.../core/flink/config/FlinkApiConfigChecker.java | 10 +-
.../core/flink/config/FlinkExecutionContext.java | 104 +++++++++
.../apache/seatunnel/core/spark/SparkStarter.java | 9 +-
.../spark/command/SparkTaskExecuteCommand.java | 4 +-
.../core/spark/config/SparkApiConfigChecker.java | 12 +-
.../core/spark/config/SparkExecutionContext.java | 103 +++++++++
.../pom.xml | 30 +--
.../plugin/discovery/AbstractPluginDiscovery.java | 170 +++++++++++++++
.../plugin/discovery/PluginDiscovery.java | 70 ++++++
.../plugin/discovery/PluginIdentifier.java | 89 ++++++++
.../discovery/flink/FlinkSinkPluginDiscovery.java | 33 +++
.../flink/FlinkSourcePluginDiscovery.java | 32 +++
.../flink/FlinkTransformPluginDiscovery.java | 42 ++++
.../discovery/spark/SparkSinkPluginDiscovery.java | 33 +++
.../spark/SparkSourcePluginDiscovery.java | 33 +++
.../spark/SparkTransformPluginDiscovery.java | 45 ++++
24 files changed, 821 insertions(+), 307 deletions(-)
diff --git a/pom.xml b/pom.xml
index 9a5382b8..893a8bd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
<module>seatunnel-e2e</module>
<module>seatunnel-api</module>
<module>seatunnel-translation</module>
+ <module>seatunnel-plugin-discovery</module>
</modules>
<properties>
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 5280250f..dd74c022 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -32,8 +32,13 @@ public class Common {
throw new IllegalStateException("Utility class");
}
+ /**
+ * Used to set the size when create a new collection(just to pass the
checkstyle).
+ */
+ public static final int COLLECTION_SIZE = 16;
+
private static final List<String> ALLOWED_MODES =
Arrays.stream(DeployMode.values())
- .map(DeployMode::getName).collect(Collectors.toList());
+ .map(DeployMode::getName).collect(Collectors.toList());
private static Optional<String> MODE = Optional.empty();
diff --git a/seatunnel-core/seatunnel-core-base/pom.xml
b/seatunnel-core/seatunnel-core-base/pom.xml
index 1248fb91..77bd13d3 100644
--- a/seatunnel-core/seatunnel-core-base/pom.xml
+++ b/seatunnel-core/seatunnel-core-base/pom.xml
@@ -50,6 +50,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-plugin-discovery</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
similarity index 59%
rename from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java
rename to
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
index 7e91a368..43ef94c4 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
@@ -22,37 +22,35 @@ import org.apache.seatunnel.apis.base.api.BaseSource;
import org.apache.seatunnel.apis.base.api.BaseTransform;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import java.net.URL;
+import java.util.Arrays;
import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* The ExecutionContext contains all configuration needed to run the job.
*
* @param <ENVIRONMENT> environment type.
*/
-public class ExecutionContext<ENVIRONMENT extends RuntimeEnv> {
+public abstract class AbstractExecutionContext<ENVIRONMENT extends RuntimeEnv>
{
private final Config config;
private final EngineType engine;
private final ENVIRONMENT environment;
private final JobMode jobMode;
- private final List<BaseSource<ENVIRONMENT>> sources;
- private final List<BaseTransform<ENVIRONMENT>> transforms;
- private final List<BaseSink<ENVIRONMENT>> sinks;
- public ExecutionContext(Config config, EngineType engine) {
+ public AbstractExecutionContext(Config config, EngineType engine) {
this.config = config;
this.engine = engine;
this.environment = new EnvironmentFactory<ENVIRONMENT>(config,
engine).getEnvironment();
this.jobMode = environment.getJobMode();
- PluginFactory<ENVIRONMENT> pluginFactory = new PluginFactory<>(config,
engine);
- this.environment.registerPlugin(pluginFactory.getPluginJarPaths());
- this.sources = pluginFactory.createPlugins(PluginType.SOURCE);
- this.transforms = pluginFactory.createPlugins(PluginType.TRANSFORM);
- this.sinks = pluginFactory.createPlugins(PluginType.SINK);
}
public Config getRootConfig() {
@@ -71,15 +69,26 @@ public class ExecutionContext<ENVIRONMENT extends
RuntimeEnv> {
return jobMode;
}
- public List<BaseSource<ENVIRONMENT>> getSources() {
- return sources;
- }
+ public abstract List<BaseSource<ENVIRONMENT>> getSources();
- public List<BaseTransform<ENVIRONMENT>> getTransforms() {
- return transforms;
- }
+ public abstract List<BaseTransform<ENVIRONMENT>> getTransforms();
+
+ public abstract List<BaseSink<ENVIRONMENT>> getSinks();
+
+ public abstract List<URL> getPluginJars();
- public List<BaseSink<ENVIRONMENT>> getSinks() {
- return sinks;
+ @SuppressWarnings("checkstyle:Indentation")
+ protected List<PluginIdentifier> getPluginIdentifiers(PluginType...
pluginTypes) {
+ return Arrays.stream(pluginTypes).flatMap(new Function<PluginType,
Stream<PluginIdentifier>>() {
+ @Override
+ public Stream<PluginIdentifier> apply(PluginType pluginType) {
+ List<? extends Config> configList =
config.getConfigList(pluginType.getType());
+ return configList.stream()
+ .map(pluginConfig -> PluginIdentifier
+ .of(engine.getEngine(),
+ pluginType.getType(),
+ pluginConfig.getString("plugin_name")));
+ }
+ }).collect(Collectors.toList());
}
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 758ec8c8..b3f7eaff 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -32,7 +32,6 @@ import java.nio.file.Path;
/**
* Used to build the {@link Config} from file.
*
- * @param <ENVIRONMENT> environment type.
*/
public class ConfigBuilder {
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
index 62b807d1..0c4fd178 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
@@ -42,9 +42,9 @@ public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(ExecutionFactory.class);
- public ExecutionContext<ENVIRONMENT> executionContext;
+ public AbstractExecutionContext<ENVIRONMENT> executionContext;
- public ExecutionFactory(ExecutionContext<ENVIRONMENT> executionContext) {
+ public ExecutionFactory(AbstractExecutionContext<ENVIRONMENT>
executionContext) {
this.executionContext = executionContext;
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
deleted file mode 100644
index 82b51ef1..00000000
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.base.config;
-
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.apis.base.plugin.Plugin;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.flink.BaseFlinkSink;
-import org.apache.seatunnel.flink.BaseFlinkSource;
-import org.apache.seatunnel.flink.BaseFlinkTransform;
-import org.apache.seatunnel.spark.BaseSparkSink;
-import org.apache.seatunnel.spark.BaseSparkSource;
-import org.apache.seatunnel.spark.BaseSparkTransform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.ServiceConfigurationError;
-import java.util.ServiceLoader;
-import java.util.stream.Collectors;
-
-/**
- * Used to load the plugins.
- *
- * @param <ENVIRONMENT> environment
- */
-public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PluginFactory.class);
- private final Config config;
- private final EngineType engineType;
- private static final Map<EngineType, Map<PluginType, Class<?>>>
PLUGIN_BASE_CLASS_MAP;
-
- private static final String PLUGIN_NAME_KEY = "plugin_name";
- private static final String PLUGIN_MAPPING_FILE =
"plugin-mapping.properties";
-
- private final List<URL> pluginJarPaths;
- private final ClassLoader defaultClassLoader;
-
- static {
- PLUGIN_BASE_CLASS_MAP = new HashMap<>();
- Map<PluginType, Class<?>> sparkBaseClassMap = new HashMap<>();
- sparkBaseClassMap.put(PluginType.SOURCE, BaseSparkSource.class);
- sparkBaseClassMap.put(PluginType.TRANSFORM, BaseSparkTransform.class);
- sparkBaseClassMap.put(PluginType.SINK, BaseSparkSink.class);
- PLUGIN_BASE_CLASS_MAP.put(EngineType.SPARK, sparkBaseClassMap);
-
- Map<PluginType, Class<?>> flinkBaseClassMap = new HashMap<>();
- flinkBaseClassMap.put(PluginType.SOURCE, BaseFlinkSource.class);
- flinkBaseClassMap.put(PluginType.TRANSFORM, BaseFlinkTransform.class);
- flinkBaseClassMap.put(PluginType.SINK, BaseFlinkSink.class);
- PLUGIN_BASE_CLASS_MAP.put(EngineType.FLINK, flinkBaseClassMap);
- }
-
- public PluginFactory(Config config, EngineType engineType) {
- this.config = config;
- this.engineType = engineType;
- this.pluginJarPaths = searchPluginJar();
- this.defaultClassLoader =
initClassLoaderWithPaths(this.pluginJarPaths);
- }
-
- private ClassLoader initClassLoaderWithPaths(List<URL> pluginJarPaths) {
- return new URLClassLoader(pluginJarPaths.toArray(new URL[0]),
- Thread.currentThread().getContextClassLoader());
- }
-
- @Nonnull
- private List<URL> searchPluginJar() {
-
- File pluginDir =
Common.connectorJarDir(this.engineType.getEngine()).toFile();
- if (!pluginDir.exists() || pluginDir.listFiles() == null) {
- return new ArrayList<>();
- }
- Config pluginMapping = ConfigFactory
- .parseFile(new File(getPluginMappingPath()))
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
- File[] plugins =
- Arrays.stream(pluginDir.listFiles()).filter(f ->
f.getName().endsWith(".jar")).toArray(File[]::new);
-
- return Arrays.stream(PluginType.values()).filter(type ->
!PluginType.TRANSFORM.equals(type))
- .flatMap(type -> {
- List<URL> pluginList = new ArrayList<>();
- List<? extends Config> configList =
config.getConfigList(type.getType());
- configList.forEach(pluginConfig -> {
-
- if (containPluginMappingValue(pluginMapping, type,
pluginConfig.getString(PLUGIN_NAME_KEY))) {
- try {
- for (File plugin : plugins) {
- if
(plugin.getName().startsWith(getPluginMappingValue(pluginMapping, type,
-
pluginConfig.getString(PLUGIN_NAME_KEY)))) {
- pluginList.add(plugin.toURI().toURL());
- break;
- }
- }
- } catch (MalformedURLException e) {
- LOGGER.warn("can get plugin url", e);
- }
- } else {
- throw new
IllegalArgumentException(String.format("can't find connector %s in " +
- "%s. If you add connector to
connectors dictionary, please modify this " +
- "file.", getPluginMappingKey(type,
pluginConfig.getString(PLUGIN_NAME_KEY)),
- getPluginMappingPath()));
- }
-
- });
- return pluginList.stream();
- }).collect(Collectors.toList());
- }
-
- public List<URL> getPluginJarPaths() {
- return this.pluginJarPaths;
- }
-
- private String getPluginMappingPath() {
- return Common.connectorDir() + "/" + PLUGIN_MAPPING_FILE;
- }
-
- private String getPluginMappingKey(PluginType type, String pluginName) {
- return this.engineType.getEngine() + "." + type.getType() + "." +
pluginName;
-
- }
-
- private String getPluginMappingValue(Config pluginMapping, PluginType
type, String pluginName) {
- return
pluginMapping.getConfig(this.engineType.getEngine()).getConfig(type.getType()).getString(pluginName);
- }
-
- private boolean containPluginMappingValue(Config pluginMapping, PluginType
type, String pluginName) {
- if (pluginMapping.hasPath(this.engineType.getEngine())) {
- Config engine =
pluginMapping.getConfig(this.engineType.getEngine());
- if (engine.hasPath(type.getType())) {
- Config plugins = engine.getConfig(type.getType());
- return plugins.hasPath(pluginName);
- }
- }
- return false;
- }
-
- /**
- * Create the plugins by plugin type.
- *
- * @param type plugin type
- * @param <T> plugin
- * @return plugin list.
- */
- @SuppressWarnings("unchecked")
- public <T extends Plugin<ENVIRONMENT>> List<T> createPlugins(PluginType
type) {
- Objects.requireNonNull(type, "PluginType can not be null when create
plugins!");
- List<T> basePluginList = new ArrayList<>();
- List<? extends Config> configList =
config.getConfigList(type.getType());
- configList.forEach(plugin -> {
- try {
- T t = (T) createPluginInstanceIgnoreCase(type,
plugin.getString(PLUGIN_NAME_KEY), this.defaultClassLoader);
- t.setConfig(plugin);
- basePluginList.add(t);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
-
- return basePluginList;
- }
-
- /**
- * create plugin class instance, ignore case.
- **/
- @SuppressWarnings("unchecked")
- private Plugin<?> createPluginInstanceIgnoreCase(PluginType pluginType,
String pluginName,
- ClassLoader classLoader)
throws Exception {
- Class<Plugin<?>> pluginBaseClass = (Class<Plugin<?>>)
getPluginBaseClass(engineType, pluginType);
-
- if (pluginName.split("\\.").length != 1) {
- // canonical class name
- Class<Plugin<?>> pluginClass = (Class<Plugin<?>>)
Class.forName(pluginName);
- if (pluginClass.isAssignableFrom(pluginBaseClass)) {
- throw new IllegalArgumentException("plugin: " + pluginName + "
is not extends from " + pluginBaseClass);
- }
- return pluginClass.getDeclaredConstructor().newInstance();
- }
- ServiceLoader<Plugin<?>> plugins = ServiceLoader.load(pluginBaseClass,
classLoader);
- for (Iterator<Plugin<?>> it = plugins.iterator(); it.hasNext(); ) {
- try {
- Plugin<?> plugin = it.next();
- if (StringUtils.equalsIgnoreCase(plugin.getPluginName(),
pluginName)) {
- return plugin;
- }
- } catch (ServiceConfigurationError e) {
- // Iterator.next() may throw ServiceConfigurationError,
- // but maybe caused by a not used plugin in this job
- LOGGER.warn("Error when load plugin: [{}]", pluginName, e);
- }
- }
- throw new ClassNotFoundException("Plugin class not found by name :[" +
pluginName + "]");
- }
-
- private Class<?> getPluginBaseClass(EngineType engineType, PluginType
pluginType) {
- if (!PLUGIN_BASE_CLASS_MAP.containsKey(engineType)) {
- throw new IllegalStateException("PluginType not support : [" +
pluginType + "]");
- }
- Map<PluginType, Class<?>> pluginTypeClassMap =
PLUGIN_BASE_CLASS_MAP.get(engineType);
- if (!pluginTypeClassMap.containsKey(pluginType)) {
- throw new IllegalStateException(pluginType + " is not supported in
engine " + engineType);
- }
- return pluginTypeClassMap.get(pluginType);
- }
-
-}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
index 345f8414..0479a552 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
@@ -24,11 +24,11 @@ import org.apache.seatunnel.apis.base.env.Execution;
import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.ExecutionContext;
import org.apache.seatunnel.core.base.config.ExecutionFactory;
import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkExecutionContext;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -53,7 +53,7 @@ public class FlinkApiTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkComm
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
Config config = new ConfigBuilder(configFile).getConfig();
- ExecutionContext<FlinkEnvironment> executionContext = new
ExecutionContext<>(config, engine);
+ FlinkExecutionContext executionContext = new
FlinkExecutionContext(config, engine);
List<BaseSource<FlinkEnvironment>> sources =
executionContext.getSources();
List<BaseTransform<FlinkEnvironment>> transforms =
executionContext.getTransforms();
List<BaseSink<FlinkEnvironment>> sinks = executionContext.getSinks();
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
index 7dca74ec..447603aa 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
@@ -20,8 +20,6 @@ package org.apache.seatunnel.core.flink.config;
import org.apache.seatunnel.core.base.config.ConfigChecker;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.EnvironmentFactory;
-import org.apache.seatunnel.core.base.config.PluginFactory;
-import org.apache.seatunnel.core.base.config.PluginType;
import org.apache.seatunnel.core.base.exception.ConfigCheckException;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -35,10 +33,10 @@ public class FlinkApiConfigChecker implements
ConfigChecker<FlinkEnvironment> {
// check environment
FlinkEnvironment environment = new
EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
// check plugins
- PluginFactory<FlinkEnvironment> pluginFactory = new
PluginFactory<>(config, EngineType.FLINK);
- pluginFactory.createPlugins(PluginType.SOURCE);
- pluginFactory.createPlugins(PluginType.TRANSFORM);
- pluginFactory.createPlugins(PluginType.SINK);
+ FlinkExecutionContext flinkExecutionContext = new
FlinkExecutionContext(config, EngineType.FLINK);
+ flinkExecutionContext.getSources();
+ flinkExecutionContext.getTransforms();
+ flinkExecutionContext.getSinks();
} catch (Exception ex) {
throw new ConfigCheckException("Config check fail", ex);
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
new file mode 100644
index 00000000..24cb9d82
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.config;
+
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.core.base.config.AbstractExecutionContext;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.flink.FlinkSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.flink.FlinkSourcePluginDiscovery;
+import
org.apache.seatunnel.plugin.discovery.flink.FlinkTransformPluginDiscovery;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkExecutionContext extends
AbstractExecutionContext<FlinkEnvironment> {
+ private final FlinkSourcePluginDiscovery flinkSourcePluginDiscovery;
+ private final FlinkTransformPluginDiscovery flinkTransformPluginDiscovery;
+ private final FlinkSinkPluginDiscovery flinkSinkPluginDiscovery;
+ private final List<URL> pluginJars;
+
+ public FlinkExecutionContext(Config config, EngineType engine) {
+ super(config, engine);
+ this.flinkSourcePluginDiscovery = new FlinkSourcePluginDiscovery();
+ this.flinkTransformPluginDiscovery = new
FlinkTransformPluginDiscovery();
+ this.flinkSinkPluginDiscovery = new FlinkSinkPluginDiscovery();
+ List<URL> pluginJars = new ArrayList<>();
+ // since we didn't split the transform plugin jars, we just need to
register the source/sink plugin jars
+
pluginJars.addAll(flinkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
+
pluginJars.addAll(flinkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
+ this.pluginJars = pluginJars;
+ this.getEnvironment().registerPlugin(pluginJars);
+ }
+
+ @Override
+ public List<BaseSource<FlinkEnvironment>> getSources() {
+ final String pluginType = PluginType.SOURCE.getType();
+ final String engineType = EngineType.FLINK.getEngine();
+ final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
+ return configList.stream()
+ .map(pluginConfig -> {
+ PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
+ BaseSource<FlinkEnvironment> pluginInstance =
flinkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ pluginInstance.setConfig(pluginConfig);
+ return pluginInstance;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<BaseTransform<FlinkEnvironment>> getTransforms() {
+ final String pluginType = PluginType.TRANSFORM.getType();
+ final String engineType = EngineType.FLINK.getEngine();
+ final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
+ return configList.stream()
+ .map(pluginConfig -> {
+ PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
+ BaseTransform<FlinkEnvironment> pluginInstance =
flinkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+ pluginInstance.setConfig(pluginConfig);
+ return pluginInstance;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<BaseSink<FlinkEnvironment>> getSinks() {
+ final String pluginType = PluginType.SINK.getType();
+ final String engineType = EngineType.FLINK.getEngine();
+ final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
+ return configList.stream()
+ .map(pluginConfig -> {
+ PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
+ BaseSink<FlinkEnvironment> pluginInstance =
flinkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ pluginInstance.setConfig(pluginConfig);
+ return pluginInstance;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<URL> getPluginJars() {
+ return pluginJars;
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 41b53080..4675cb2e 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -19,16 +19,15 @@ package org.apache.seatunnel.core.spark;
import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.PluginFactory;
import org.apache.seatunnel.core.base.utils.CompressionUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.config.SparkExecutionContext;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -216,13 +215,13 @@ public class SparkStarter implements Starter {
* return connector's jars, which located in 'connectors/spark/*'.
*/
private List<Path> getConnectorJarDependencies() {
- Path pluginRootDir = Common.connectorJarDir("SPARK");
+ Path pluginRootDir = Common.connectorJarDir("spark");
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir))
{
return Collections.emptyList();
}
Config config = new
ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
- PluginFactory<RuntimeEnv> pluginFactory = new PluginFactory<>(config,
EngineType.SPARK);
- return pluginFactory.getPluginJarPaths().stream().map(url -> new
File(url.getPath()).toPath()).collect(Collectors.toList());
+ SparkExecutionContext sparkExecutionContext = new
SparkExecutionContext(config, EngineType.SPARK);
+ return sparkExecutionContext.getPluginJars().stream().map(url -> new
File(url.getPath()).toPath()).collect(Collectors.toList());
}
/**
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index 9a051416..e9c54b44 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -24,11 +24,11 @@ import org.apache.seatunnel.apis.base.env.Execution;
import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.ExecutionContext;
import org.apache.seatunnel.core.base.config.ExecutionFactory;
import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.config.SparkExecutionContext;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -50,7 +50,7 @@ public class SparkTaskExecuteCommand extends
BaseTaskExecuteCommand<SparkCommand
Path confFile = FileUtils.getConfigPath(sparkCommandArgs);
Config config = new ConfigBuilder(confFile).getConfig();
- ExecutionContext<SparkEnvironment> executionContext = new
ExecutionContext<>(config, engine);
+ SparkExecutionContext executionContext = new
SparkExecutionContext(config, engine);
List<BaseSource<SparkEnvironment>> sources =
executionContext.getSources();
List<BaseTransform<SparkEnvironment>> transforms =
executionContext.getTransforms();
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
index 26de5794..776e4d66 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
@@ -20,8 +20,6 @@ package org.apache.seatunnel.core.spark.config;
import org.apache.seatunnel.core.base.config.ConfigChecker;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.EnvironmentFactory;
-import org.apache.seatunnel.core.base.config.PluginFactory;
-import org.apache.seatunnel.core.base.config.PluginType;
import org.apache.seatunnel.core.base.exception.ConfigCheckException;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -33,12 +31,12 @@ public class SparkApiConfigChecker implements
ConfigChecker<FlinkEnvironment> {
public void checkConfig(Config config) throws ConfigCheckException {
try {
// check environment
- FlinkEnvironment environment = new
EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+ FlinkEnvironment environment = new
EnvironmentFactory<FlinkEnvironment>(config, EngineType.SPARK).getEnvironment();
// check plugins
- PluginFactory<FlinkEnvironment> pluginFactory = new
PluginFactory<>(config, EngineType.FLINK);
- pluginFactory.createPlugins(PluginType.SOURCE);
- pluginFactory.createPlugins(PluginType.TRANSFORM);
- pluginFactory.createPlugins(PluginType.SINK);
+ SparkExecutionContext sparkExecutionContext = new
SparkExecutionContext(config, EngineType.SPARK);
+ sparkExecutionContext.getSources();
+ sparkExecutionContext.getTransforms();
+ sparkExecutionContext.getSinks();
} catch (Exception ex) {
throw new ConfigCheckException("Config check fail", ex);
}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
new file mode 100644
index 00000000..13d1c0f3
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.config;
+
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.core.base.config.AbstractExecutionContext;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
+import
org.apache.seatunnel.plugin.discovery.spark.SparkTransformPluginDiscovery;
+import org.apache.seatunnel.spark.SparkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SparkExecutionContext extends
AbstractExecutionContext<SparkEnvironment> {
+ private final SparkSourcePluginDiscovery sparkSourcePluginDiscovery;
+ private final SparkTransformPluginDiscovery sparkTransformPluginDiscovery;
+ private final SparkSinkPluginDiscovery sparkSinkPluginDiscovery;
+ private final List<URL> pluginJars;
+
+ public SparkExecutionContext(Config config, EngineType engine) {
+ super(config, engine);
+ this.sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
+ this.sparkTransformPluginDiscovery = new
SparkTransformPluginDiscovery();
+ this.sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
+ List<URL> pluginJars = new ArrayList<>();
+
pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
+
pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
+ this.pluginJars = pluginJars;
+ this.getEnvironment().registerPlugin(pluginJars);
+ }
+
+ @Override
+ public List<BaseSource<SparkEnvironment>> getSources() {
+ final String pluginType = PluginType.SOURCE.getType();
+ final String engineType = EngineType.SPARK.getEngine();
+ final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
+ return configList.stream()
+ .map(pluginConfig -> {
+ PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
+ BaseSource<SparkEnvironment> pluginInstance =
sparkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ pluginInstance.setConfig(pluginConfig);
+ return pluginInstance;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<BaseTransform<SparkEnvironment>> getTransforms() {
+ final String pluginType = PluginType.TRANSFORM.getType();
+ final String engineType = EngineType.SPARK.getEngine();
+ final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
+ return configList.stream()
+ .map(pluginConfig -> {
+ PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
+ BaseTransform<SparkEnvironment> pluginInstance =
sparkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+ pluginInstance.setConfig(pluginConfig);
+ return pluginInstance;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<BaseSink<SparkEnvironment>> getSinks() {
+ final String pluginType = PluginType.SINK.getType();
+ final String engineType = EngineType.SPARK.getEngine();
+ final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
+ return configList.stream()
+ .map(pluginConfig -> {
+ PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
+ BaseSink<SparkEnvironment> pluginInstance =
sparkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ pluginInstance.setConfig(pluginConfig);
+ return pluginInstance;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<URL> getPluginJars() {
+ return pluginJars;
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-base/pom.xml
b/seatunnel-plugin-discovery/pom.xml
similarity index 79%
copy from seatunnel-core/seatunnel-core-base/pom.xml
copy to seatunnel-plugin-discovery/pom.xml
index 1248fb91..378e0670 100644
--- a/seatunnel-core/seatunnel-core-base/pom.xml
+++ b/seatunnel-plugin-discovery/pom.xml
@@ -1,64 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
-
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
http://www.apache.org/licenses/LICENSE-2.0
-
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core</artifactId>
+ <artifactId>seatunnel</artifactId>
<version>${revision}</version>
- <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-core-base</artifactId>
+ <artifactId>seatunnel-plugin-discovery</artifactId>
<dependencies>
-
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api-base</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api-flink</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api-spark</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.beust</groupId>
- <artifactId>jcommander</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
</dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
new file mode 100644
index 00000000..8c90dbab
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery;
+
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.common.config.Common;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T>
{
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractPluginDiscovery.class);
+ private final Path pluginDir;
+
+ protected final ConcurrentHashMap<PluginIdentifier, Optional<T>>
pluginInstanceMap =
+ new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+ protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>>
pluginJarPath =
+ new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+
+ public AbstractPluginDiscovery(String pluginSubDir) {
+ this.pluginDir = Common.connectorJarDir(pluginSubDir);
+ LOGGER.info("Load {} Plugin from {}",
getPluginBaseClass().getSimpleName(), pluginDir);
+ }
+
+ @Override
+ public List<URL> getPluginJarPaths(List<PluginIdentifier>
pluginIdentifiers) {
+ return pluginIdentifiers.stream()
+ .map(this::getPluginJarPath)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
+ return pluginIdentifiers.stream()
+ .map(this::getPluginInstance)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public T getPluginInstance(PluginIdentifier pluginIdentifier) {
+ Optional<T> pluginInstance = pluginInstanceMap
+ .computeIfAbsent(pluginIdentifier, this::createPluginInstance);
+ if (!pluginInstance.isPresent()) {
+ throw new IllegalArgumentException("Can't find plugin: " +
pluginIdentifier);
+ }
+ return pluginInstance.get();
+ }
+
+ /**
+ * Get the plugin instance.
+ *
+ * @param pluginIdentifier plugin identifier.
+ * @return plugin instance.
+ */
+ protected Optional<URL> getPluginJarPath(PluginIdentifier
pluginIdentifier) {
+ return pluginJarPath.computeIfAbsent(pluginIdentifier,
this::findPluginJarPath);
+ }
+
+ /**
+ * Get spark plugin interface.
+ *
+ * @return plugin base class.
+ */
+ protected abstract Class<T> getPluginBaseClass();
+
+ /**
+ * Find the plugin jar path;
+ *
+ * @param pluginIdentifier plugin identifier.
+ * @return plugin jar path.
+ */
+ private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier)
{
+ if (PLUGIN_JAR_MAPPING.isEmpty()) {
+ return Optional.empty();
+ }
+ final String engineType =
pluginIdentifier.getEngineType().toLowerCase();
+ final String pluginType =
pluginIdentifier.getPluginType().toLowerCase();
+ final String pluginName =
pluginIdentifier.getPluginName().toLowerCase();
+ if (!PLUGIN_JAR_MAPPING.hasPath(engineType)) {
+ return Optional.empty();
+ }
+ Config engineConfig = PLUGIN_JAR_MAPPING.getConfig(engineType);
+ if (!engineConfig.hasPath(pluginType)) {
+ return Optional.empty();
+ }
+ Config typeConfig = engineConfig.getConfig(pluginType);
+ Optional<Map.Entry<String, ConfigValue>> optional =
typeConfig.entrySet().stream()
+ .filter(entry -> StringUtils.equalsIgnoreCase(entry.getKey(),
pluginName))
+ .findFirst();
+ if (!optional.isPresent()) {
+ return Optional.empty();
+ }
+ String pluginJarPrefix =
optional.get().getValue().unwrapped().toString();
+ File[] targetPluginFiles = pluginDir.toFile().listFiles(new
FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().endsWith(".jar") &&
StringUtils.startsWithIgnoreCase(pathname.getName(), pluginJarPrefix);
+ }
+ });
+ if (ArrayUtils.isEmpty(targetPluginFiles)) {
+ return Optional.empty();
+ }
+ try {
+ URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
+ LOGGER.info("Discovery plugin jar: {} at: {}",
pluginIdentifier.getPluginName(), pluginJarPath);
+ return Optional.of(pluginJarPath);
+ } catch (MalformedURLException e) {
+ LOGGER.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
+ return Optional.empty();
+ }
+ }
+
+ private Optional<T> createPluginInstance(PluginIdentifier
pluginIdentifier) {
+ Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
+ ClassLoader classLoader;
+ // if the plugin jar not exist in plugin dir, will load from classpath.
+ if (pluginJarPath.isPresent()) {
+ LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier,
pluginJarPath.get());
+ classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()},
Thread.currentThread().getContextClassLoader());
+ } else {
+ LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+ classLoader = Thread.currentThread().getContextClassLoader();
+ }
+ ServiceLoader<T> serviceLoader =
ServiceLoader.load(getPluginBaseClass(), classLoader);
+ for (T t : serviceLoader) {
+ // todo: add plugin identifier interface to support new api
interface.
+ Plugin<?> pluginInstance = (Plugin<?>) t;
+ if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(),
pluginIdentifier.getPluginName())) {
+ return Optional.of((T) pluginInstance);
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
new file mode 100644
index 00000000..cdc85860
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery;
+
+import org.apache.seatunnel.common.config.Common;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Plugins discovery interface, used to find plugin. Each plugin type should
have its own implementation.
+ *
+ * @param <T> plugin type
+ */
+public interface PluginDiscovery<T> {
+
+ String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
+ /**
+ * The plugin mapping config.
+ * e,g.flink.source.DruidSource=seatunnel-connector-flink-druid
+ */
+ Config PLUGIN_JAR_MAPPING =
+ ConfigFactory
+ // todo: rename to plugin dir
+
.parseFile(Common.connectorDir().resolve(PLUGIN_MAPPING_FILE).toFile())
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+ /**
+ * Get all plugin jar paths.
+ *
+ * @return plugin jars.
+ */
+ List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers);
+
+ /**
+ * Get plugin instance by plugin identifier.
+ *
+ * @param pluginIdentifier plugin identifier.
+ * @return plugin instance. If not found, throw IllegalArgumentException.
+ */
+ T getPluginInstance(PluginIdentifier pluginIdentifier);
+
+ /**
+ * Get all plugin instances.
+ *
+ * @return plugin instances.
+ */
+ List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers);
+
+}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
new file mode 100644
index 00000000..97d6e9f8
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery;
+
+import java.util.Objects;
+
+/**
+ * Used to identify a plugin.
+ */
+public class PluginIdentifier {
+ private final String engineType;
+ private final String pluginType;
+ private final String pluginName;
+
+ private PluginIdentifier(String engineType, String pluginType, String
pluginName) {
+ this.engineType = engineType;
+ this.pluginType = pluginType;
+ this.pluginName = pluginName;
+ }
+
+ public static PluginIdentifier of(String engineType, String pluginType,
String pluginName) {
+ return new PluginIdentifier(engineType, pluginType, pluginName);
+ }
+
+ public String getEngineType() {
+ return engineType;
+ }
+
+ public String getPluginType() {
+ return pluginType;
+ }
+
+ public String getPluginName() {
+ return pluginName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PluginIdentifier that = (PluginIdentifier) o;
+
+ if (!Objects.equals(engineType, that.engineType)) {
+ return false;
+ }
+ if (!Objects.equals(pluginType, that.pluginType)) {
+ return false;
+ }
+ return Objects.equals(pluginName, that.pluginName);
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:magicnumber")
+ public int hashCode() {
+ int result = engineType != null ? engineType.hashCode() : 0;
+ result = 31 * result + (pluginType != null ? pluginType.hashCode() :
0);
+ result = 31 * result + (pluginName != null ? pluginName.hashCode() :
0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "PluginIdentifier{" +
+ "engineType='" + engineType + '\'' +
+ ", pluginType='" + pluginType + '\'' +
+ ", pluginName='" + pluginName + '\'' +
+ '}';
+ }
+}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
new file mode 100644
index 00000000..a2185a4a
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.flink;
+
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+
+public class FlinkSinkPluginDiscovery extends
AbstractPluginDiscovery<BaseFlinkSink> {
+
+ public FlinkSinkPluginDiscovery() {
+ super("flink");
+ }
+
+ @Override
+ protected Class<BaseFlinkSink> getPluginBaseClass() {
+ return BaseFlinkSink.class;
+ }
+}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
new file mode 100644
index 00000000..24ff89e0
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.flink;
+
+import org.apache.seatunnel.flink.BaseFlinkSource;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+
+public class FlinkSourcePluginDiscovery extends
AbstractPluginDiscovery<BaseFlinkSource> {
+ public FlinkSourcePluginDiscovery() {
+ super("flink");
+ }
+
+ @Override
+ protected Class<BaseFlinkSource> getPluginBaseClass() {
+ return BaseFlinkSource.class;
+ }
+}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
new file mode 100644
index 00000000..cda5ab6b
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.flink;
+
+import org.apache.seatunnel.flink.BaseFlinkTransform;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FlinkTransformPluginDiscovery extends
AbstractPluginDiscovery<BaseFlinkTransform> {
+
+ public FlinkTransformPluginDiscovery() {
+ super("flink");
+ }
+
+ public List<URL> getPluginJarPaths(List<PluginIdentifier>
pluginIdentifiers) {
+ return new ArrayList<>();
+ }
+
+ @Override
+ protected Class<BaseFlinkTransform> getPluginBaseClass() {
+ return BaseFlinkTransform.class;
+ }
+}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSinkPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSinkPluginDiscovery.java
new file mode 100644
index 00000000..8a30c68e
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSinkPluginDiscovery.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.spark;
+
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.spark.BaseSparkSink;
+
+public class SparkSinkPluginDiscovery extends
AbstractPluginDiscovery<BaseSparkSink> {
+
+ public SparkSinkPluginDiscovery() {
+ super("spark");
+ }
+
+ @Override
+ protected Class<BaseSparkSink> getPluginBaseClass() {
+ return BaseSparkSink.class;
+ }
+}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSourcePluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSourcePluginDiscovery.java
new file mode 100644
index 00000000..ee961bf9
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSourcePluginDiscovery.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.spark;
+
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.spark.BaseSparkSource;
+
+public class SparkSourcePluginDiscovery extends
AbstractPluginDiscovery<BaseSparkSource> {
+
+ public SparkSourcePluginDiscovery() {
+ super("spark");
+ }
+
+ @Override
+ protected Class<BaseSparkSource> getPluginBaseClass() {
+ return BaseSparkSource.class;
+ }
+}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
new file mode 100644
index 00000000..849fabbd
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.spark;
+
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.spark.BaseSparkTransform;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Transform plugin will load from the classpath.
+ */
+public class SparkTransformPluginDiscovery extends
AbstractPluginDiscovery<BaseSparkTransform> {
+
+ public SparkTransformPluginDiscovery() {
+ super("spark");
+ }
+
+ public List<URL> getPluginJarPaths(List<PluginIdentifier>
pluginIdentifiers) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected Class<BaseSparkTransform> getPluginBaseClass() {
+ return BaseSparkTransform.class;
+ }
+}