This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 99fa19d2ea [Refactor][core] Unify transformFactory creation logic
(#8574)
99fa19d2ea is described below
commit 99fa19d2eab95b68dd79e27866233bd950efb166
Author: Guangdong Liu <[email protected]>
AuthorDate: Thu Feb 6 11:36:28 2025 +0800
[Refactor][core] Unify transformFactory creation logic (#8574)
---
.../seatunnel/api/table/factory/FactoryUtil.java | 41 +++++++++-
.../seatunnel/common/constants}/EngineType.java | 2 +-
.../seatunnel/core/starter/enums/PluginType.java | 35 ---------
.../core/starter/execution/PluginUtil.java | 87 ----------------------
.../seatunnel/core/starter/flink/FlinkStarter.java | 2 +-
.../core/starter/flink/SeaTunnelFlink.java | 2 +-
.../flink/execution/SinkExecuteProcessor.java | 38 ++++++++--
.../seatunnel/core/starter/flink/FlinkStarter.java | 2 +-
.../core/starter/flink/SeaTunnelFlink.java | 2 +-
.../FlinkAbstractPluginExecuteProcessor.java | 3 +-
.../flink/execution/SinkExecuteProcessor.java | 39 ++++++++--
.../flink/execution/SourceExecuteProcessor.java | 12 +--
.../flink/execution/TransformExecuteProcessor.java | 37 ++++++---
.../core/starter/spark/SeaTunnelSpark.java | 2 +-
.../seatunnel/core/starter/spark/SparkStarter.java | 4 +-
.../spark/execution/SinkExecuteProcessor.java | 34 +++++----
.../core/starter/spark/SeaTunnelSpark.java | 2 +-
.../seatunnel/core/starter/spark/SparkStarter.java | 4 +-
.../spark/execution/SinkExecuteProcessor.java | 34 +++++----
.../spark/execution/SourceExecuteProcessor.java | 11 ++-
.../SparkAbstractPluginExecuteProcessor.java | 2 +-
.../spark/execution/SparkRuntimeEnvironment.java | 2 +-
.../spark/execution/TransformExecuteProcessor.java | 36 ++++++---
.../core/starter/seatunnel/SeaTunnelClient.java | 2 +-
.../core/starter/seatunnel/SeaTunnelServer.java | 2 +-
.../core/parse/MultipleTableJobConfigParser.java | 3 +-
26 files changed, 225 insertions(+), 215 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 30e7b00864..131d50852e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
@@ -37,6 +38,9 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.slf4j.Logger;
@@ -106,7 +110,10 @@ public final class FactoryUtil {
if (fallback) {
source =
fallbackCreateSource.apply(
- PluginIdentifier.of("seatunnel", "source",
factoryId));
+ PluginIdentifier.of(
+ EngineType.SEATUNNEL.getEngine(),
+ PluginType.SOURCE.getType(),
+ factoryId));
source.prepare(options.toConfig());
} else {
@@ -205,7 +212,10 @@ public final class FactoryUtil {
if (fallback) {
SeaTunnelSink sink =
fallbackCreateSink.apply(
- PluginIdentifier.of("seatunnel", "sink",
factoryId));
+ PluginIdentifier.of(
+ EngineType.SEATUNNEL.getEngine(),
+ PluginType.SINK.getType(),
+ factoryId));
sink.prepare(config.toConfig());
sink.setTypeInfo(catalogTable.getSeaTunnelRowType());
@@ -273,6 +283,23 @@ public final class FactoryUtil {
return
factory.getClass().getProtectionDomain().getCodeSource().getLocation();
}
+ public static <T extends Factory> Optional<T> discoverOptionalFactory(
+ ClassLoader classLoader,
+ Class<T> factoryClass,
+ String factoryIdentifier,
+ Function<String, T> discoverOptionalFactoryFunction) {
+
+ if (discoverOptionalFactoryFunction != null) {
+ T apply = discoverOptionalFactoryFunction.apply(factoryIdentifier);
+ if (apply != null) {
+ return Optional.of(apply);
+ } else {
+ return Optional.empty();
+ }
+ }
+ return discoverOptionalFactory(classLoader, factoryClass,
factoryIdentifier);
+ }
+
public static <T extends Factory> Optional<T> discoverOptionalFactory(
ClassLoader classLoader, Class<T> factoryClass, String
factoryIdentifier) {
final List<T> foundFactories = discoverFactories(classLoader,
factoryClass);
@@ -436,4 +463,14 @@ public final class FactoryUtil {
}
return false;
}
+
+ public static void ensureJobModeMatch(JobContext jobContext,
SeaTunnelSource source) {
+ if (jobContext.getJobMode() == JobMode.BATCH
+ && source.getBoundedness()
+ ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "'%s' source don't support off-line job.",
source.getPluginName()));
+ }
+ }
}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java
similarity index 97%
rename from
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
rename to
seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java
index 355fa838d4..6f5c79c1ed 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.enums;
+package org.apache.seatunnel.common.constants;
/** Engine type enum */
public enum EngineType {
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java
deleted file mode 100644
index 5fdec8b71b..0000000000
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.enums;
-
-/** Plugin type enum */
-public enum PluginType {
- SOURCE("source"),
- TRANSFORM("transform"),
- SINK("sink");
-
- private final String type;
-
- PluginType(String type) {
- this.type = type;
- }
-
- public String getType() {
- return type;
- }
-}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
deleted file mode 100644
index b2b47854e3..0000000000
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.execution;
-
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PluginIdentifier;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.FactoryException;
-import org.apache.seatunnel.common.constants.JobMode;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Optional;
-
-import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
-
-/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
-@SuppressWarnings("rawtypes")
-public class PluginUtil {
-
- protected static final String ENGINE_TYPE = "seatunnel";
-
- public static Optional<? extends Factory> createTransformFactory(
- SeaTunnelFactoryDiscovery factoryDiscovery,
- SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
- Config transformConfig,
- List<URL> pluginJars) {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE, "transform",
transformConfig.getString(PLUGIN_NAME.key()));
- pluginJars.addAll(
-
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- try {
- return
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
- } catch (FactoryException e) {
- return Optional.empty();
- }
- }
-
- public static Optional<? extends Factory> createSinkFactory(
- SeaTunnelFactoryDiscovery factoryDiscovery,
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- Config sinkConfig,
- List<URL> pluginJars) {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(ENGINE_TYPE, "sink",
sinkConfig.getString(PLUGIN_NAME.key()));
- pluginJars.addAll(
-
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- try {
- return
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
- } catch (FactoryException e) {
- return Optional.empty();
- }
- }
-
- public static void ensureJobModeMatch(JobContext jobContext,
SeaTunnelSource source) {
- if (jobContext.getJobMode() == JobMode.BATCH
- && source.getBoundedness()
- ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
- throw new UnsupportedOperationException(
- String.format(
- "'%s' source don't support off-line job.",
source.getPluginName()));
- }
- }
-}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index e9d0ba7df2..7d106abbce 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.core.starter.flink;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.Starter;
-import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
index 8d1b434801..4356e85bfb 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.core.starter.flink;
+import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 4466844536..b700af5b46 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
@@ -35,9 +36,10 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -56,6 +58,7 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
@SuppressWarnings({"unchecked", "rawtypes"})
@Slf4j
@@ -77,14 +80,34 @@ public class SinkExecuteProcessor
new SeaTunnelFactoryDiscovery(TableSinkFactory.class,
ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
+ Function<String, TableSinkFactory> discoverOptionalFactoryFunction =
+ pluginName ->
+ (TableSinkFactory)
+ factoryDiscovery
+ .createOptionalPluginInstance(
+ PluginIdentifier.of(
+
EngineType.SEATUNNEL.getEngine(),
+
PluginType.SINK.getType(),
+ pluginName))
+ .orElse(null);
+
return pluginConfigs.stream()
.map(
- sinkConfig ->
- PluginUtil.createSinkFactory(
- factoryDiscovery,
- sinkPluginDiscovery,
- sinkConfig,
- jarPaths))
+ sinkConfig -> {
+ jarPaths.addAll(
+ sinkPluginDiscovery.getPluginJarPaths(
+ Lists.newArrayList(
+ PluginIdentifier.of(
+
EngineType.SEATUNNEL.getEngine(),
+
PluginType.SINK.getType(),
+
sinkConfig.getString(
+
PLUGIN_NAME.key())))));
+ return discoverOptionalFactory(
+ classLoader,
+ TableSinkFactory.class,
+ sinkConfig.getString(PLUGIN_NAME.key()),
+ discoverOptionalFactoryFunction);
+ })
.distinct()
.collect(Collectors.toList());
}
@@ -95,7 +118,6 @@ public class SinkExecuteProcessor
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 06cfd5f449..54d1984bf4 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.core.starter.flink;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.Starter;
-import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
index 1595da686a..dbae7e5fe9 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.core.starter.flink;
+import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 00614b1d88..aef7d57416 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -36,8 +36,6 @@ import static
org.apache.seatunnel.api.common.CommonOptions.PLUGIN_INPUT;
public abstract class FlinkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DataStreamTableInfo,
FlinkRuntimeEnvironment> {
- protected static final String ENGINE_TYPE = "seatunnel";
-
protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER
=
(classLoader, url) -> {
if
(classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
@@ -57,6 +55,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
protected JobContext jobContext;
protected final List<T> plugins;
protected final Config envConfig;
+ protected final ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
protected FlinkAbstractPluginExecuteProcessor(
List<URL> jarPaths,
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index a82d2392e6..d6d8518a28 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
@@ -35,9 +36,10 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -57,6 +59,7 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
@Slf4j
@SuppressWarnings("unchecked,rawtypes")
@@ -74,18 +77,39 @@ public class SinkExecuteProcessor
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
+
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSinkFactory.class,
ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
+ Function<String, TableSinkFactory> discoverOptionalFactoryFunction =
+ pluginName ->
+ (TableSinkFactory)
+ factoryDiscovery
+ .createOptionalPluginInstance(
+ PluginIdentifier.of(
+
EngineType.SEATUNNEL.getEngine(),
+
PluginType.SINK.getType(),
+ pluginName))
+ .orElse(null);
+
return pluginConfigs.stream()
.map(
- sinkConfig ->
- PluginUtil.createSinkFactory(
- factoryDiscovery,
- sinkPluginDiscovery,
- sinkConfig,
- jarPaths))
+ sinkConfig -> {
+ jarPaths.addAll(
+ sinkPluginDiscovery.getPluginJarPaths(
+ Lists.newArrayList(
+ PluginIdentifier.of(
+
EngineType.SEATUNNEL.getEngine(),
+
PluginType.SINK.getType(),
+
sinkConfig.getString(
+
PLUGIN_NAME.key())))));
+ return discoverOptionalFactory(
+ classLoader,
+ TableSinkFactory.class,
+ sinkConfig.getString(PLUGIN_NAME.key()),
+ discoverOptionalFactoryFunction);
+ })
.distinct()
.collect(Collectors.toList());
}
@@ -96,7 +120,6 @@ public class SinkExecuteProcessor
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index d637f2256b..deafcc6cb2 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -30,7 +30,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.core.starter.enums.PluginType;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -53,12 +54,11 @@ import java.util.function.Function;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
-import static
org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch;
@Slf4j
@SuppressWarnings("unchecked,rawtypes")
public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<SourceTableInfo> {
- private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
public SourceExecuteProcessor(
List<URL> jarPaths,
@@ -113,14 +113,16 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(
- ENGINE_TYPE, PLUGIN_TYPE,
sourceConfig.getString(PLUGIN_NAME.key()));
+ EngineType.SEATUNNEL.getEngine(),
+ PluginType.SOURCE.getType(),
+ sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> source =
FactoryUtil.createAndPrepareSource(
ReadonlyConfig.fromConfig(sourceConfig),
- Thread.currentThread().getContextClassLoader(),
+ classLoader,
pluginIdentifier.getPluginName(),
fallbackCreateSource,
(TableSourceFactory)
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 615876c417..fa1380cd00 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -17,9 +17,11 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
@@ -28,8 +30,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
@@ -49,6 +52,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
@SuppressWarnings("unchecked,rawtypes")
@@ -66,23 +70,32 @@ public class TransformExecuteProcessor
@Override
protected List<TableTransformFactory> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
-
- SeaTunnelFactoryDiscovery factoryDiscovery =
- new SeaTunnelFactoryDiscovery(TableTransformFactory.class,
ADD_URL_TO_CLASSLOADER);
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableTransformFactory.class,
ADD_URL_TO_CLASSLOADER);
return pluginConfigs.stream()
.map(
- transformConfig ->
- PluginUtil.createTransformFactory(
- factoryDiscovery,
- transformPluginDiscovery,
- transformConfig,
- jarPaths))
+ transformConfig -> {
+ jarPaths.addAll(
+ transformPluginDiscovery.getPluginJarPaths(
+ Lists.newArrayList(
+ PluginIdentifier.of(
+
EngineType.SEATUNNEL.getEngine(),
+
PluginType.TRANSFORM.getType(),
+
transformConfig.getString(
+
PLUGIN_NAME.key())))));
+ return Optional.of(
+ (TableTransformFactory)
+
factoryDiscovery.createPluginInstance(
+ PluginIdentifier.of(
+
EngineType.SEATUNNEL.getEngine(),
+
PluginType.TRANSFORM.getType(),
+
transformConfig.getString(
+
PLUGIN_NAME.key()))));
+ })
.distinct()
- .filter(Optional::isPresent)
.map(Optional::get)
- .map(e -> (TableTransformFactory) e)
.collect(Collectors.toList());
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
index ca7b2ed4be..fd7dea9287 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.core.starter.spark;
+import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index e9ee482e9c..3325ae38a5 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -23,9 +23,9 @@ import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.Starter;
-import org.apache.seatunnel.core.starter.enums.EngineType;
-import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 6e22576c91..5a9ff3d2e4 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
@@ -33,10 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -56,10 +57,10 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
public class SinkExecuteProcessor
extends SparkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
- private static final String PLUGIN_TYPE = PluginType.SINK.getType();
protected SinkExecuteProcessor(
SparkRuntimeEnvironment sparkRuntimeEnvironment,
@@ -71,19 +72,27 @@ public class SinkExecuteProcessor
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<? extends Config> pluginConfigs) {
- SeaTunnelFactoryDiscovery factoryDiscovery =
- new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
+ SeaTunnelFactoryDiscovery sinkPluginDiscovery =
+ new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
List<Optional<? extends Factory>> sinks =
pluginConfigs.stream()
.map(
- sinkConfig ->
- PluginUtil.createSinkFactory(
- factoryDiscovery,
- sinkPluginDiscovery,
- sinkConfig,
- pluginJars))
+ sinkConfig -> {
+ pluginJars.addAll(
+
sinkPluginDiscovery.getPluginJarPaths(
+ Lists.newArrayList(
+
PluginIdentifier.of(
+
EngineType.SEATUNNEL
+
.getEngine(),
+
PluginType.SINK.getType(),
+
sinkConfig.getString(
+
PLUGIN_NAME.key())))));
+ return discoverOptionalFactory(
+ classLoader,
+ TableSinkFactory.class,
+
sinkConfig.getString(PLUGIN_NAME.key()));
+ })
.distinct()
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -94,7 +103,6 @@ public class SinkExecuteProcessor
public List<DatasetTableInfo> execute(List<DatasetTableInfo>
upstreamDataStreams)
throws TaskExecuteException {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
DatasetTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
index 9b3fde6fd1..e984e19182 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.core.starter.spark;
+import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index dcd6a804b2..3411b785d1 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -23,9 +23,9 @@ import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.Starter;
-import org.apache.seatunnel.core.starter.enums.EngineType;
-import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 74b0e1c1f1..19a07878ed 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
@@ -33,10 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -59,11 +60,11 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
@Slf4j
public class SinkExecuteProcessor
extends SparkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
- private static final String PLUGIN_TYPE = PluginType.SINK.getType();
protected SinkExecuteProcessor(
SparkRuntimeEnvironment sparkRuntimeEnvironment,
@@ -75,19 +76,27 @@ public class SinkExecuteProcessor
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<? extends Config> pluginConfigs) {
- SeaTunnelFactoryDiscovery factoryDiscovery =
- new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
+ SeaTunnelFactoryDiscovery sinkPluginDiscovery =
+ new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
List<Optional<? extends Factory>> sinks =
pluginConfigs.stream()
.map(
- sinkConfig ->
- PluginUtil.createSinkFactory(
- factoryDiscovery,
- sinkPluginDiscovery,
- sinkConfig,
- new ArrayList<>()))
+ sinkConfig -> {
+ pluginJars.addAll(
+
sinkPluginDiscovery.getPluginJarPaths(
+ Lists.newArrayList(
+
PluginIdentifier.of(
+
EngineType.SEATUNNEL
+
.getEngine(),
+
PluginType.SINK.getType(),
+
sinkConfig.getString(
+
PLUGIN_NAME.key())))));
+ return discoverOptionalFactory(
+ classLoader,
+ TableSinkFactory.class,
+
sinkConfig.getString(PLUGIN_NAME.key()));
+ })
.distinct()
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -98,7 +107,6 @@ public class SinkExecuteProcessor
public List<DatasetTableInfo> execute(List<DatasetTableInfo>
upstreamDataStreams)
throws TaskExecuteException {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
DatasetTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 50b3a88814..5ccbb59cbd 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -30,6 +30,8 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -52,11 +54,10 @@ import java.util.function.Function;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
-import static
org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch;
@SuppressWarnings("rawtypes")
public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<SourceTableInfo> {
- private static final String PLUGIN_TYPE = "source";
private Map envOption = new HashMap<String, String>();
public SourceExecuteProcessor(
@@ -124,13 +125,15 @@ public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(
- ENGINE_TYPE, PLUGIN_TYPE,
sourceConfig.getString(PLUGIN_NAME.key()));
+ EngineType.SEATUNNEL.getEngine(),
+ PluginType.SOURCE.getType(),
+ sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> source =
FactoryUtil.createAndPrepareSource(
ReadonlyConfig.fromConfig(sourceConfig),
- Thread.currentThread().getContextClassLoader(),
+ classLoader,
pluginIdentifier.getPluginName(),
fallbackCreateSource,
null);
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
index d9de016446..42b9155483 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
@@ -48,7 +48,7 @@ public abstract class SparkAbstractPluginExecuteProcessor<T>
protected final List<? extends Config> pluginConfigs;
protected final JobContext jobContext;
protected final List<T> plugins;
- protected static final String ENGINE_TYPE = "seatunnel";
+ protected final ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
protected SparkAbstractPluginExecuteProcessor(
SparkRuntimeEnvironment sparkRuntimeEnvironment,
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
index 7e31ca463b..e9bc4e07c9 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.enums.PluginType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.spark.SparkConf;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 492af1ad73..f10135e724 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -17,9 +17,11 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -29,8 +31,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -56,6 +59,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
@Slf4j
@@ -71,25 +75,39 @@ public class TransformExecuteProcessor
@Override
protected List<TableTransformFactory> initializePlugins(List<? extends
Config> pluginConfigs) {
+
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableTransformFactory.class);
+
List<URL> pluginJars = new ArrayList<>();
List<TableTransformFactory> transforms =
pluginConfigs.stream()
.map(
- transformConfig ->
- PluginUtil.createTransformFactory(
- factoryDiscovery,
- transformPluginDiscovery,
- transformConfig,
- new ArrayList<>()))
+ transformConfig -> {
+ pluginJars.addAll(
+
transformPluginDiscovery.getPluginJarPaths(
+ Lists.newArrayList(
+
PluginIdentifier.of(
+
EngineType.SEATUNNEL
+
.getEngine(),
+
PluginType.TRANSFORM.getType(),
+
transformConfig.getString(
+
PLUGIN_NAME.key())))));
+ return Optional.of(
+ (TableTransformFactory)
+
factoryDiscovery.createPluginInstance(
+
PluginIdentifier.of(
+
EngineType.SEATUNNEL
+
.getEngine(),
+
PluginType.TRANSFORM.getType(),
+
transformConfig.getString(
+
PLUGIN_NAME.key()))));
+ })
.distinct()
- .filter(Optional::isPresent)
.map(Optional::get)
- .map(e -> (TableTransformFactory) e)
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
return transforms;
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
index 7a37fd340c..308ef35905 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.core.starter.seatunnel;
+import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
index 96a3e32e51..a340c0acb6 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.core.starter.seatunnel;
+import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 903db02f80..6ec3eaf0cf 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -46,7 +46,6 @@ import
org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -386,7 +385,7 @@ public class MultipleTableJobConfigParser {
String actionName =
JobConfigParser.createSourceActionName(configIndex, factoryId);
SeaTunnelSource<Object, SourceSplit, Serializable> source =
tuple2._1();
source.setJobContext(jobConfig.getJobContext());
- PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
+ FactoryUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
SourceAction<Object, SourceSplit, Serializable> action =
new SourceAction<>(id, actionName, tuple2._1(), factoryUrls,
new HashSet<>());
action.setParallelism(parallelism);