This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a05ba93464 [feature][core] Unified engine initialization connector
logic (#8536)
a05ba93464 is described below
commit a05ba934644e22d51bded244414ff2616302d6a1
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Jan 22 18:49:58 2025 +0800
[feature][core] Unified engine initialization connector logic (#8536)
---
.../seatunnel/api/common}/PluginIdentifier.java | 2 +-
.../seatunnel/api/table/factory/FactoryUtil.java | 123 ++++++++++++---
.../core/starter/execution/PluginUtil.java | 171 +--------------------
.../flink/execution/SinkExecuteProcessor.java | 84 +++-------
.../flink/execution/SinkExecuteProcessor.java | 84 +++-------
.../flink/execution/SourceExecuteProcessor.java | 41 +++--
.../seatunnel/core/starter/spark/SparkStarter.java | 2 +-
.../spark/execution/SinkExecuteProcessor.java | 57 ++++---
.../seatunnel/core/starter/spark/SparkStarter.java | 2 +-
.../spark/execution/SinkExecuteProcessor.java | 59 ++++---
.../spark/execution/SourceExecuteProcessor.java | 37 +++--
.../SparkAbstractPluginExecuteProcessor.java | 21 +++
.../seatunnel/command/ConnectorCheckCommand.java | 2 +-
.../resources/neo4j/fake_to_neo4j_batch_write.conf | 4 +
.../engine/core/parse/ConnectorInstanceLoader.java | 2 +-
.../engine/core/parse/JobConfigParser.java | 142 -----------------
.../core/parse/MultipleTableJobConfigParser.java | 91 ++++-------
.../plugin/discovery/AbstractPluginDiscovery.java | 1 +
.../plugin/discovery/PluginDiscovery.java | 1 +
.../seatunnel/SeaTunnelFactoryDiscovery.java | 2 +-
.../seatunnel/SeaTunnelSinkPluginDiscovery.java | 2 +-
.../seatunnel/SeaTunnelSourcePluginDiscovery.java | 2 +-
.../SeaTunnelTransformPluginDiscovery.java | 2 +-
.../discovery/AbstractPluginDiscoveryTest.java | 1 +
.../SeaTunnelSourcePluginDiscoveryTest.java | 2 +-
25 files changed, 314 insertions(+), 623 deletions(-)
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java
similarity index 98%
rename from
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java
index 8540063698..4cdbfe27e4 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.plugin.discovery;
+package org.apache.seatunnel.api.common;
import org.apache.commons.lang3.StringUtils;
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index e11afd1d19..30e7b00864 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
@@ -36,11 +37,13 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
import java.io.Serializable;
@@ -51,12 +54,17 @@ import java.util.Map;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
+import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
/**
* Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and
{@link
* CatalogFactory}.
*/
+@Slf4j
public final class FactoryUtil {
private static final Logger LOG =
LoggerFactory.getLogger(FactoryUtil.class);
@@ -65,8 +73,13 @@ public final class FactoryUtil {
public static <T, SplitT extends SourceSplit, StateT extends Serializable>
Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>>
createAndPrepareSource(
- ReadonlyConfig options, ClassLoader classLoader, String
factoryIdentifier) {
- return restoreAndPrepareSource(options, classLoader,
factoryIdentifier, null);
+ ReadonlyConfig options,
+ ClassLoader classLoader,
+ String factoryIdentifier,
+ Function<PluginIdentifier, SeaTunnelSource>
fallbackCreateSource,
+ TableSourceFactory factory) {
+ return restoreAndPrepareSource(
+ options, classLoader, factoryIdentifier, null,
fallbackCreateSource, factory);
}
public static <T, SplitT extends SourceSplit, StateT extends Serializable>
@@ -74,22 +87,46 @@ public final class FactoryUtil {
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier,
- ChangeStreamTableSourceCheckpoint checkpoint) {
+ ChangeStreamTableSourceCheckpoint checkpoint,
+ Function<PluginIdentifier, SeaTunnelSource>
fallbackCreateSource,
+ TableSourceFactory factory) {
try {
- final TableSourceFactory factory =
- discoverFactory(classLoader, TableSourceFactory.class,
factoryIdentifier);
+
SeaTunnelSource<T, SplitT, StateT> source;
- if (factory instanceof ChangeStreamTableSourceFactory &&
checkpoint != null) {
- ChangeStreamTableSourceFactory changeStreamTableSourceFactory =
- (ChangeStreamTableSourceFactory) factory;
- ChangeStreamTableSourceState<Serializable, SourceSplit> state =
-
changeStreamTableSourceFactory.deserializeTableSourceState(checkpoint);
+ final String factoryId = options.get(PLUGIN_NAME);
+
+ boolean fallback =
+ isFallback(
+ classLoader,
+ TableSourceFactory.class,
+ factoryId,
+ (sourceFactory) ->
sourceFactory.createSource(null));
+
+ if (fallback) {
source =
- restoreAndPrepareSource(
- changeStreamTableSourceFactory, options,
classLoader, state);
+ fallbackCreateSource.apply(
+ PluginIdentifier.of("seatunnel", "source",
factoryId));
+ source.prepare(options.toConfig());
+
} else {
- source = createAndPrepareSource(factory, options, classLoader);
+ if (factory == null) {
+ factory =
+ discoverFactory(
+ classLoader, TableSourceFactory.class,
factoryIdentifier);
+ }
+
+ if (factory instanceof ChangeStreamTableSourceFactory &&
checkpoint != null) {
+ ChangeStreamTableSourceFactory
changeStreamTableSourceFactory =
+ (ChangeStreamTableSourceFactory) factory;
+ ChangeStreamTableSourceState<Serializable, SourceSplit>
state =
+
changeStreamTableSourceFactory.deserializeTableSourceState(checkpoint);
+ source =
+ restoreAndPrepareSource(
+ changeStreamTableSourceFactory, options,
classLoader, state);
+ } else {
+ source = createAndPrepareSource(factory, options,
classLoader);
+ }
}
List<CatalogTable> catalogTables;
try {
@@ -115,6 +152,7 @@ public final class FactoryUtil {
catalogTables.add(catalogTable);
}
return new Tuple2<>(source, catalogTables);
+
} catch (Throwable t) {
throw new FactoryException(
String.format(
@@ -150,17 +188,42 @@ public final class FactoryUtil {
CatalogTable catalogTable,
ReadonlyConfig config,
ClassLoader classLoader,
- String factoryIdentifier) {
+ String factoryIdentifier,
+ Function<PluginIdentifier, SeaTunnelSink>
fallbackCreateSink,
+ TableSinkFactory<IN, StateT, CommitInfoT,
AggregatedCommitInfoT>
+ tableSinkFactory) {
try {
- TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
factory =
- discoverFactory(classLoader, TableSinkFactory.class,
factoryIdentifier);
+ final String factoryId = config.get(PLUGIN_NAME);
+
+ boolean fallback =
+ isFallback(
+ classLoader,
+ TableSinkFactory.class,
+ factoryId,
+ (sinkFactory) -> sinkFactory.createSink(null));
+
+ if (fallback) {
+ SeaTunnelSink sink =
+ fallbackCreateSink.apply(
+ PluginIdentifier.of("seatunnel", "sink",
factoryId));
+ sink.prepare(config.toConfig());
+ sink.setTypeInfo(catalogTable.getSeaTunnelRowType());
+
+ return sink;
+ }
+
+ if (tableSinkFactory == null) {
+ tableSinkFactory =
+ discoverFactory(classLoader, TableSinkFactory.class,
factoryIdentifier);
+ }
+
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable,
config,
classLoader,
- factory.excludeTablePlaceholderReplaceKeys());
-
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
+
tableSinkFactory.excludeTablePlaceholderReplaceKeys());
+
ConfigValidator.of(context.getOptions()).validate(tableSinkFactory.optionRule());
LOG.info(
"Create sink '{}' with upstream input
catalog-table[database: {}, schema: {}, table: {}]",
@@ -168,7 +231,7 @@ public final class FactoryUtil {
catalogTable.getTablePath().getDatabaseName(),
catalogTable.getTablePath().getSchemaName(),
catalogTable.getTablePath().getTableName());
- return factory.createSink(context).createSink();
+ return tableSinkFactory.createSink(context).createSink();
} catch (Throwable t) {
throw new FactoryException(
String.format(
@@ -351,4 +414,26 @@ public final class FactoryUtil {
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
}
+
+ private static <T extends Factory> boolean isFallback(
+ ClassLoader classLoader,
+ Class<T> factoryClass,
+ String factoryId,
+ Consumer<T> virtualCreator) {
+ Optional<T> factory = discoverOptionalFactory(classLoader,
factoryClass, factoryId);
+ if (!factory.isPresent()) {
+ return true;
+ }
+ try {
+ virtualCreator.accept(factory.get());
+ } catch (Exception e) {
+ if (e instanceof UnsupportedOperationException
+ && "The Factory has not been implemented and the
deprecated Plugin will be used."
+ .equals(e.getMessage())) {
+ return true;
+ }
+ log.debug(ExceptionUtils.getMessage(e));
+ }
+ return false;
+ }
}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index a8a245dd9b..b2b47854e3 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -20,40 +20,21 @@ package org.apache.seatunnel.core.starter.execution;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryException;
-import org.apache.seatunnel.api.table.factory.FactoryUtil;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.enums.PluginType;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import java.net.URL;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
-import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
@SuppressWarnings("rawtypes")
@@ -61,75 +42,6 @@ public class PluginUtil {
protected static final String ENGINE_TYPE = "seatunnel";
- public static SourceTableInfo createSource(
- SeaTunnelFactoryDiscovery factoryDiscovery,
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery,
- PluginIdentifier pluginIdentifier,
- Config pluginConfig,
- JobContext jobContext) {
- // get current thread classloader
- ClassLoader classLoader =
- Thread.currentThread()
- .getContextClassLoader(); // try to find factory of
this plugin
-
- final ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
- // try to find table source factory
- final Optional<Factory> sourceFactory =
-
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
- final boolean fallback = isFallback(sourceFactory);
- SeaTunnelSource source;
- if (fallback) {
- source = fallbackCreate(sourcePluginDiscovery, pluginIdentifier,
pluginConfig);
- } else {
- // create source with source factory
- TableSourceFactoryContext context =
- new TableSourceFactoryContext(readonlyConfig, classLoader);
-
ConfigValidator.of(context.getOptions()).validate(sourceFactory.get().optionRule());
- TableSource tableSource =
- ((TableSourceFactory)
sourceFactory.get()).createSource(context);
- source = tableSource.createSource();
- }
- source.setJobContext(jobContext);
- ensureJobModeMatch(jobContext, source);
- List<CatalogTable> catalogTables;
- try {
- catalogTables = source.getProducedCatalogTables();
- } catch (UnsupportedOperationException e) {
- // TODO remove it when all connector use `getProducedCatalogTables`
- SeaTunnelDataType<?> seaTunnelDataType = source.getProducedType();
- final String tableId =
-
readonlyConfig.getOptional(CommonOptions.PLUGIN_OUTPUT).orElse(DEFAULT_ID);
- catalogTables =
-
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
- }
- return new SourceTableInfo(source, catalogTables);
- }
-
- private static boolean isFallback(Optional<Factory> factory) {
- if (!factory.isPresent()) {
- return true;
- }
- try {
- ((TableSourceFactory) factory.get()).createSource(null);
- } catch (Exception e) {
- if (e instanceof UnsupportedOperationException
- && "The Factory has not been implemented and the
deprecated Plugin will be used."
- .equals(e.getMessage())) {
- return true;
- }
- }
- return false;
- }
-
- private static SeaTunnelSource fallbackCreate(
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery,
- PluginIdentifier pluginIdentifier,
- Config pluginConfig) {
- SeaTunnelSource source =
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
- source.prepare(pluginConfig);
- return source;
- }
-
public static Optional<? extends Factory> createTransformFactory(
SeaTunnelFactoryDiscovery factoryDiscovery,
SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
@@ -163,87 +75,6 @@ public class PluginUtil {
}
}
- public static SeaTunnelSink createSink(
- Optional<? extends Factory> factory,
- Config sinkConfig,
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- JobContext jobContext,
- List<CatalogTable> catalogTables,
- ClassLoader classLoader) {
- boolean fallBack = !factory.isPresent() || isFallback(factory.get());
- if (fallBack) {
- SeaTunnelSink sink =
- fallbackCreateSink(
- sinkPluginDiscovery,
- PluginIdentifier.of(
- ENGINE_TYPE,
- PluginType.SINK.getType(),
- sinkConfig.getString(PLUGIN_NAME.key())),
- sinkConfig);
- sink.setJobContext(jobContext);
- sink.setTypeInfo(catalogTables.get(0).getSeaTunnelRowType());
- return sink;
- } else {
- if (catalogTables.size() > 1) {
- Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
- ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sinkConfig);
- catalogTables.forEach(
- catalogTable -> {
- TableSinkFactoryContext context =
-
TableSinkFactoryContext.replacePlaceholderAndCreate(
- catalogTable,
-
ReadonlyConfig.fromConfig(sinkConfig),
- classLoader,
- ((TableSinkFactory) factory.get())
-
.excludeTablePlaceholderReplaceKeys());
- ConfigValidator.of(context.getOptions())
- .validate(factory.get().optionRule());
- SeaTunnelSink action =
- ((TableSinkFactory) factory.get())
- .createSink(context)
- .createSink();
- action.setJobContext(jobContext);
- sinks.put(catalogTable.getTablePath(), action);
- });
- return FactoryUtil.createMultiTableSink(sinks, readonlyConfig,
classLoader);
- }
- TableSinkFactoryContext context =
- TableSinkFactoryContext.replacePlaceholderAndCreate(
- catalogTables.get(0),
- ReadonlyConfig.fromConfig(sinkConfig),
- classLoader,
- ((TableSinkFactory) factory.get())
- .excludeTablePlaceholderReplaceKeys());
-
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
- SeaTunnelSink sink =
- ((TableSinkFactory)
factory.get()).createSink(context).createSink();
- sink.setJobContext(jobContext);
- return sink;
- }
- }
-
- public static boolean isFallback(Factory factory) {
- try {
- ((TableSinkFactory) factory).createSink(null);
- } catch (Exception e) {
- if (e instanceof UnsupportedOperationException
- && "The Factory has not been implemented and the
deprecated Plugin will be used."
- .equals(e.getMessage())) {
- return true;
- }
- }
- return false;
- }
-
- public static SeaTunnelSink fallbackCreateSink(
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- PluginIdentifier pluginIdentifier,
- Config pluginConfig) {
- SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
- source.prepare(pluginConfig);
- return source;
- }
-
public static void ensureJobModeMatch(JobContext jobContext,
SeaTunnelSource source) {
if (jobContext.getJobMode() == JobMode.BATCH
&& source.getBoundedness()
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 6f24e1c3fe..4466844536 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -34,14 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -55,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
@@ -65,8 +62,6 @@ import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
- private static final String PLUGIN_TYPE = PluginType.SINK.getType();
-
protected SinkExecuteProcessor(
List<URL> jarPaths,
Config envConfig,
@@ -101,48 +96,27 @@ public class SinkExecuteProcessor
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+ sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
+ Optional<? extends Factory> factory = plugins.get(i);
Config sinkConfig = pluginConfigs.get(i);
DataStreamTableInfo stream =
fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
- Optional<? extends Factory> factory = plugins.get(i);
- boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
- if (fallBack) {
- for (CatalogTable catalogTable : stream.getCatalogTables()) {
- SeaTunnelSink fallBackSink =
- fallbackCreateSink(
- sinkPluginDiscovery,
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
sinkConfig.getString(PLUGIN_NAME.key())),
- sinkConfig);
- fallBackSink.setJobContext(jobContext);
- SeaTunnelRowType sourceType =
catalogTable.getSeaTunnelRowType();
- fallBackSink.setTypeInfo(sourceType);
- handleSaveMode(fallBackSink);
- TableIdentifier tableId = catalogTable.getTableId();
- sinks.put(tableId.toTablePath(), fallBackSink);
- }
- } else {
- for (CatalogTable catalogTable : stream.getCatalogTables()) {
- SeaTunnelSink seaTunnelSink;
- TableSinkFactoryContext context =
-
TableSinkFactoryContext.replacePlaceholderAndCreate(
- catalogTable,
- ReadonlyConfig.fromConfig(sinkConfig),
- classLoader,
- ((TableSinkFactory) factory.get())
-
.excludeTablePlaceholderReplaceKeys());
-
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
- seaTunnelSink =
- ((TableSinkFactory)
factory.get()).createSink(context).createSink();
- seaTunnelSink.setJobContext(jobContext);
- handleSaveMode(seaTunnelSink);
- TableIdentifier tableId = catalogTable.getTableId();
- sinks.put(tableId.toTablePath(), seaTunnelSink);
- }
+ for (CatalogTable catalogTable : stream.getCatalogTables()) {
+ SeaTunnelSink sink =
+ FactoryUtil.createAndPrepareSink(
+ catalogTable,
+ ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader,
+ sinkConfig.getString(PLUGIN_NAME.key()),
+ fallbackCreateSink,
+ ((TableSinkFactory) (factory.orElse(null))));
+ sink.setJobContext(jobContext);
+ handleSaveMode(sink);
+ TableIdentifier tableId = catalogTable.getTableId();
+ sinks.put(tableId.toTablePath(), sink);
}
SeaTunnelSink sink =
tryGenerateMultiTableSink(
@@ -178,28 +152,6 @@ public class SinkExecuteProcessor
return FactoryUtil.createMultiTableSink(sinks, sinkConfig,
classLoader);
}
- public boolean isFallback(Factory factory) {
- try {
- ((TableSinkFactory) factory).createSink(null);
- } catch (Exception e) {
- if (e instanceof UnsupportedOperationException
- && "The Factory has not been implemented and the
deprecated Plugin will be used."
- .equals(e.getMessage())) {
- return true;
- }
- }
- return false;
- }
-
- public SeaTunnelSink fallbackCreateSink(
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- PluginIdentifier pluginIdentifier,
- Config pluginConfig) {
- SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
- source.prepare(pluginConfig);
- return source;
- }
-
public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
if (seaTunnelSink instanceof SupportSaveMode) {
SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index d41bfe34ce..a82d2392e6 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -34,14 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -56,6 +52,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
@@ -66,8 +63,6 @@ import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
- private static final String PLUGIN_TYPE = PluginType.SINK.getType();
-
protected SinkExecuteProcessor(
List<URL> jarPaths,
Config envConfig,
@@ -102,48 +97,27 @@ public class SinkExecuteProcessor
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+ sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
+ Optional<? extends Factory> factory = plugins.get(i);
Config sinkConfig = pluginConfigs.get(i);
DataStreamTableInfo stream =
fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
- Optional<? extends Factory> factory = plugins.get(i);
- boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
- if (fallBack) {
- for (CatalogTable catalogTable : stream.getCatalogTables()) {
- SeaTunnelSink fallBackSink =
- fallbackCreateSink(
- sinkPluginDiscovery,
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
sinkConfig.getString(PLUGIN_NAME.key())),
- sinkConfig);
- fallBackSink.setJobContext(jobContext);
- SeaTunnelRowType sourceType =
catalogTable.getSeaTunnelRowType();
- fallBackSink.setTypeInfo(sourceType);
- handleSaveMode(fallBackSink);
- TableIdentifier tableId = catalogTable.getTableId();
- sinks.put(tableId.toTablePath(), fallBackSink);
- }
- } else {
- for (CatalogTable catalogTable : stream.getCatalogTables()) {
- SeaTunnelSink seaTunnelSink;
- TableSinkFactoryContext context =
-
TableSinkFactoryContext.replacePlaceholderAndCreate(
- catalogTable,
- ReadonlyConfig.fromConfig(sinkConfig),
- classLoader,
- ((TableSinkFactory) factory.get())
-
.excludeTablePlaceholderReplaceKeys());
-
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
- seaTunnelSink =
- ((TableSinkFactory)
factory.get()).createSink(context).createSink();
- seaTunnelSink.setJobContext(jobContext);
- handleSaveMode(seaTunnelSink);
- TableIdentifier tableId = catalogTable.getTableId();
- sinks.put(tableId.toTablePath(), seaTunnelSink);
- }
+ for (CatalogTable catalogTable : stream.getCatalogTables()) {
+ SeaTunnelSink sink =
+ FactoryUtil.createAndPrepareSink(
+ catalogTable,
+ ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader,
+ sinkConfig.getString(PLUGIN_NAME.key()),
+ fallbackCreateSink,
+ ((TableSinkFactory) (factory.orElse(null))));
+ sink.setJobContext(jobContext);
+ handleSaveMode(sink);
+ TableIdentifier tableId = catalogTable.getTableId();
+ sinks.put(tableId.toTablePath(), sink);
}
SeaTunnelSink sink =
tryGenerateMultiTableSink(
@@ -184,28 +158,6 @@ public class SinkExecuteProcessor
return FactoryUtil.createMultiTableSink(sinks, sinkConfig,
classLoader);
}
- public boolean isFallback(Factory factory) {
- try {
- ((TableSinkFactory) factory).createSink(null);
- } catch (Exception e) {
- if (e instanceof UnsupportedOperationException
- && "The Factory has not been implemented and the
deprecated Plugin will be used."
- .equals(e.getMessage())) {
- return true;
- }
- }
- return false;
- }
-
- public SeaTunnelSink fallbackCreateSink(
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- PluginIdentifier pluginIdentifier,
- Config pluginConfig) {
- SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
- source.prepare(pluginConfig);
- return source;
- }
-
public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
if (seaTunnelSink instanceof SupportSaveMode) {
SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 4e1de7c95d..d637f2256b 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -22,14 +22,16 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.core.starter.enums.PluginType;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.translation.flink.source.FlinkSource;
@@ -39,15 +41,19 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.extern.slf4j.Slf4j;
+import scala.Tuple2;
+import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.Function;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
+import static
org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
@Slf4j
@SuppressWarnings("unchecked,rawtypes")
@@ -95,11 +101,12 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
@Override
protected List<SourceTableInfo> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
- new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
-
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSourceFactory.class,
ADD_URL_TO_CLASSLOADER);
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
+ new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
+ Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource =
+ sourcePluginDiscovery::createPluginInstance;
List<SourceTableInfo> sources = new ArrayList<>();
Set<URL> jars = new HashSet<>();
@@ -109,14 +116,22 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
ENGINE_TYPE, PLUGIN_TYPE,
sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SourceTableInfo source =
- PluginUtil.createSource(
- factoryDiscovery,
- sourcePluginDiscovery,
- pluginIdentifier,
- sourceConfig,
- jobContext);
- sources.add(source);
+
+ Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> source =
+ FactoryUtil.createAndPrepareSource(
+ ReadonlyConfig.fromConfig(sourceConfig),
+ Thread.currentThread().getContextClassLoader(),
+ pluginIdentifier.getPluginName(),
+ fallbackCreateSource,
+ (TableSourceFactory)
+ factoryDiscovery
+
.createOptionalPluginInstance(pluginIdentifier)
+ .orElse(null));
+
+ source._1().setJobContext(jobContext);
+ ensureJobModeMatch(jobContext, source._1());
+
+ sources.add(new SourceTableInfo(source._1(), source._2()));
}
jarPaths.addAll(jars);
return sources;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index f2c20e8408..e9ee482e9c 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.spark;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
@@ -29,7 +30,6 @@ import
org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index d4f99d65f5..6e22576c91 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -29,12 +31,12 @@ import
org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -45,11 +47,14 @@ import org.apache.spark.sql.Row;
import java.net.URL;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
public class SinkExecuteProcessor
@@ -91,6 +96,8 @@ public class SinkExecuteProcessor
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
DatasetTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
+ Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+ sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
DatasetTableInfo datasetTableInfo =
@@ -110,15 +117,25 @@ public class SinkExecuteProcessor
CommonOptions.PARALLELISM.defaultValue());
}
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(),
parallelism);
- Optional<? extends Factory> factory = plugins.get(i);
+ Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
+ datasetTableInfo.getCatalogTables().stream()
+ .forEach(
+ catalogTable -> {
+ SeaTunnelSink<Object, Object, Object, Object>
sink =
+ FactoryUtil.createAndPrepareSink(
+ catalogTable,
+
ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader,
+
sinkConfig.getString(PLUGIN_NAME.key()),
+ fallbackCreateSink,
+ null);
+ sink.setJobContext(jobContext);
+
sinks.put(catalogTable.getTableId().toTablePath(), sink);
+ });
+
SeaTunnelSink sink =
- PluginUtil.createSink(
- factory,
- sinkConfig,
- sinkPluginDiscovery,
- jobContext,
- datasetTableInfo.getCatalogTables(),
- classLoader);
+ tryGenerateMultiTableSink(
+ sinks, ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
// TODO modify checkpoint location
handleSaveMode(sink);
String applicationId =
@@ -134,28 +151,6 @@ public class SinkExecuteProcessor
return null;
}
- public boolean isFallback(Factory factory) {
- try {
- ((TableSinkFactory) factory).createSink(null);
- } catch (Exception e) {
- if (e instanceof UnsupportedOperationException
- && "The Factory has not been implemented and the
deprecated Plugin will be used."
- .equals(e.getMessage())) {
- return true;
- }
- }
- return false;
- }
-
- public SeaTunnelSink fallbackCreateSink(
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- PluginIdentifier pluginIdentifier,
- Config pluginConfig) {
- SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
- source.prepare(pluginConfig);
- return source;
- }
-
public void handleSaveMode(SeaTunnelSink sink) {
if (sink instanceof SupportSaveMode) {
Optional<SaveModeHandler> saveModeHandler =
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 790a20191d..dcd6a804b2 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.spark;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
@@ -29,7 +30,6 @@ import
org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index b66aaf7d86..74b0e1c1f1 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -29,12 +31,12 @@ import
org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -44,15 +46,21 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
+import lombok.extern.slf4j.Slf4j;
+
import java.net.URL;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+@Slf4j
public class SinkExecuteProcessor
extends SparkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
private static final String PLUGIN_TYPE = PluginType.SINK.getType();
@@ -92,6 +100,8 @@ public class SinkExecuteProcessor
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
DatasetTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
+ Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+ sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
DatasetTableInfo datasetTableInfo =
@@ -110,15 +120,24 @@ public class SinkExecuteProcessor
CommonOptions.PARALLELISM.defaultValue());
}
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(),
parallelism);
- Optional<? extends Factory> factory = plugins.get(i);
+ Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
+ datasetTableInfo.getCatalogTables().stream()
+ .forEach(
+ catalogTable -> {
+ SeaTunnelSink<Object, Object, Object, Object>
sink =
+ FactoryUtil.createAndPrepareSink(
+ catalogTable,
+
ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader,
+
sinkConfig.getString(PLUGIN_NAME.key()),
+ fallbackCreateSink,
+ null);
+ sink.setJobContext(jobContext);
+
sinks.put(catalogTable.getTableId().toTablePath(), sink);
+ });
SeaTunnelSink sink =
- PluginUtil.createSink(
- factory,
- sinkConfig,
- sinkPluginDiscovery,
- jobContext,
- datasetTableInfo.getCatalogTables(),
- classLoader);
+ tryGenerateMultiTableSink(
+ sinks, ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
// TODO modify checkpoint location
handleSaveMode(sink);
String applicationId =
@@ -135,28 +154,6 @@ public class SinkExecuteProcessor
return null;
}
- public boolean isFallback(Factory factory) {
- try {
- ((TableSinkFactory) factory).createSink(null);
- } catch (Exception e) {
- if (e instanceof UnsupportedOperationException
- && "The Factory has not been implemented and the
deprecated Plugin will be used."
- .equals(e.getMessage())) {
- return true;
- }
- }
- return false;
- }
-
- public SeaTunnelSink fallbackCreateSink(
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- PluginIdentifier pluginIdentifier,
- Config pluginConfig) {
- SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
- source.prepare(pluginConfig);
- return source;
- }
-
public void handleSaveMode(SeaTunnelSink sink) {
if (sink instanceof SupportSaveMode) {
Optional<SaveModeHandler> saveModeHandler =
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 5f4a583d84..50b3a88814 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -23,21 +23,24 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
@@ -45,9 +48,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
+import static
org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
@SuppressWarnings("rawtypes")
public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<SourceTableInfo> {
@@ -110,8 +115,9 @@ public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<
@Override
protected List<SourceTableInfo> initializePlugins(List<? extends Config>
pluginConfigs) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new
SeaTunnelSourcePluginDiscovery();
- SeaTunnelFactoryDiscovery factoryDiscovery =
- new SeaTunnelFactoryDiscovery(TableSourceFactory.class);
+
+ Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource =
+ sourcePluginDiscovery::createPluginInstance;
List<SourceTableInfo> sources = new ArrayList<>();
Set<URL> jars = new HashSet<>();
@@ -121,14 +127,17 @@ public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<
ENGINE_TYPE, PLUGIN_TYPE,
sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SourceTableInfo source =
- PluginUtil.createSource(
- factoryDiscovery,
- sourcePluginDiscovery,
- pluginIdentifier,
- sourceConfig,
- jobContext);
- sources.add(source);
+ Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> source =
+ FactoryUtil.createAndPrepareSource(
+ ReadonlyConfig.fromConfig(sourceConfig),
+ Thread.currentThread().getContextClassLoader(),
+ pluginIdentifier.getPluginName(),
+ fallbackCreateSource,
+ null);
+
+ source._1().setJobContext(jobContext);
+ ensureJobModeMatch(jobContext, source._1());
+ sources.add(new SourceTableInfo(source._1(), source._2()));
}
sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));
return sources;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
index e85e2c56eb..d9de016446 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
@@ -21,6 +21,10 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -28,12 +32,16 @@ import
org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_INPUT;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
+@Slf4j
public abstract class SparkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DatasetTableInfo,
SparkRuntimeEnvironment> {
protected SparkRuntimeEnvironment sparkRuntimeEnvironment;
@@ -101,6 +109,19 @@ public abstract class
SparkAbstractPluginExecuteProcessor<T>
pluginInputIdentifier));
}
+ // if not support multi table, rollback
+ protected SeaTunnelSink tryGenerateMultiTableSink(
+ Map<TablePath, SeaTunnelSink> sinks,
+ ReadonlyConfig sinkConfig,
+ ClassLoader classLoader) {
+ if (sinks.values().stream().anyMatch(sink -> !(sink instanceof
SupportMultiTableSink))) {
+ log.info("Unsupported multi table sink api, rollback to sink
template");
+ // choose the first sink
+ return sinks.values().iterator().next();
+ }
+ return FactoryUtil.createMultiTableSink(sinks, sinkConfig,
classLoader);
+ }
+
private void registerTempView(String tableName, Dataset<Row> ds) {
ds.createOrReplaceTempView(tableName);
}
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java
index 1a2514fdca..b7b9adbccb 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.seatunnel.command;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.constants.PluginType;
@@ -25,7 +26,6 @@ import
org.apache.seatunnel.core.starter.exception.CommandExecuteException;
import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import
org.apache.seatunnel.core.starter.seatunnel.args.ConnectorCheckCommandArgs;
import org.apache.seatunnel.plugin.discovery.PluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
index bb22567bcf..86f52b5f54 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
@@ -54,6 +54,10 @@ sink {
max_transaction_retry_time = 3
max_connection_timeout = 1
+ queryParamPosition = {
+ string = 0
+ int = 1
+ }
query = "unwind $batch as row create(n:BatchLabel) set n.name =
row.name,n.age = row.age"
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
index affc90283b..87b1a4e7cd 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
@@ -21,12 +21,12 @@ import
org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 2ec19cabc9..02a56b3b83 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -17,42 +17,15 @@
package org.apache.seatunnel.engine.core.parse;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
-import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
-import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.Data;
import lombok.NonNull;
-import scala.Serializable;
-import scala.Tuple2;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
@Data
public class JobConfigParser {
@@ -73,125 +46,10 @@ public class JobConfigParser {
this.isStartWithSavePoint = isStartWithSavePoint;
}
- public Tuple2<CatalogTable, Action> parseSource(
- Config config, JobConfig jobConfig, String tableId, int
parallelism) {
- ImmutablePair<SeaTunnelSource, Set<URL>> tuple =
- ConnectorInstanceLoader.loadSourceInstance(
- config, jobConfig.getJobContext(), commonPluginJars);
- final SeaTunnelSource source = tuple.getLeft();
- // old logic: prepare(initialization) -> set job context
- source.prepare(config);
- source.setJobContext(jobConfig.getJobContext());
- PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
- String actionName =
- createSourceActionName(0,
config.getString(CollectionConstants.PLUGIN_NAME));
- SourceAction action =
- new SourceAction(
- idGenerator.getNextId(),
- actionName,
- tuple.getLeft(),
- tuple.getRight(),
- new HashSet<>());
- action.setParallelism(parallelism);
- SeaTunnelRowType producedType = (SeaTunnelRowType)
tuple.getLeft().getProducedType();
- CatalogTable catalogTable = CatalogTableUtil.getCatalogTable(tableId,
producedType);
- return new Tuple2<>(catalogTable, action);
- }
-
- public List<SinkAction<?, ?, ?, ?>> parseSinks(
- int configIndex,
- List<List<Tuple2<CatalogTable, Action>>> inputVertices,
- Config sinkConfig,
- JobConfig jobConfig) {
- List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
- int spareParallelism =
inputVertices.get(0).get(0)._2().getParallelism();
- if (inputVertices.size() > 1) {
- // union
- Set<Action> inputActions =
- inputVertices.stream()
- .flatMap(Collection::stream)
- .map(Tuple2::_2)
-
.collect(Collectors.toCollection(LinkedHashSet::new));
- checkProducedTypeEquals(inputActions);
- SinkAction<?, ?, ?, ?> sinkAction =
- parseSink(
- configIndex,
- sinkConfig,
- jobConfig,
- spareParallelism,
- inputVertices
- .get(0)
- .get(0)
- ._1()
- .getTableSchema()
- .toPhysicalRowDataType(),
- inputActions);
- sinkActions.add(sinkAction);
- } else {
- // sink template
- for (Tuple2<CatalogTable, Action> tableTuple :
inputVertices.get(0)) {
- CatalogTable catalogTable = tableTuple._1();
- Action inputAction = tableTuple._2();
- int parallelism = inputAction.getParallelism();
- SinkAction<?, ?, ?, ?> sinkAction =
- parseSink(
- configIndex,
- sinkConfig,
- jobConfig,
- parallelism,
-
catalogTable.getTableSchema().toPhysicalRowDataType(),
- Collections.singleton(inputAction));
- sinkActions.add(sinkAction);
- }
- }
- return sinkActions;
- }
-
- private SinkAction<?, ?, ?, ?> parseSink(
- int configIndex,
- Config config,
- JobConfig jobConfig,
- int parallelism,
- SeaTunnelRowType rowType,
- Set<Action> inputActions) {
- final ImmutablePair<
- SeaTunnelSink<SeaTunnelRow, Serializable,
Serializable, Serializable>,
- Set<URL>>
- tuple =
- ConnectorInstanceLoader.loadSinkInstance(
- config, jobConfig.getJobContext(),
commonPluginJars);
- final SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> sink =
- tuple.getLeft();
- // old logic: prepare(initialization) -> set job context -> set row
type (There is a logical
- // judgment that depends on before and after, not a simple set)
- sink.prepare(config);
- sink.setJobContext(jobConfig.getJobContext());
- sink.setTypeInfo(rowType);
- if (!isStartWithSavePoint) {
- multipleTableJobConfigParser.handleSaveMode(sink);
- }
- final String actionName =
- createSinkActionName(configIndex,
tuple.getLeft().getPluginName());
- final SinkAction action =
- new SinkAction<>(
- idGenerator.getNextId(),
- actionName,
- new ArrayList<>(inputActions),
- sink,
- tuple.getRight(),
- new HashSet<>());
- action.setParallelism(parallelism);
- return action;
- }
-
static String createSourceActionName(int configIndex, String pluginName) {
return String.format("Source[%s]-%s", configIndex, pluginName);
}
- static String createSinkActionName(int configIndex, String pluginName) {
- return String.format("Sink[%s]-%s", configIndex, pluginName);
- }
-
static String createSinkActionName(int configIndex, String pluginName,
String table) {
return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table);
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 61df9abcfe..903db02f80 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
@@ -35,10 +36,7 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import
org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceCheckpoint;
-import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
@@ -48,7 +46,6 @@ import
org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -64,7 +61,6 @@ import
org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.JobPipelineCheckpointData;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
@@ -95,7 +91,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -116,7 +111,6 @@ public class MultipleTableJobConfigParser {
private final ReadonlyConfig envOptions;
- private final JobConfigParser fallbackParser;
private final boolean isStartWithSavePoint;
private final List<JobPipelineCheckpointData> pipelineCheckpoints;
@@ -166,8 +160,6 @@ public class MultipleTableJobConfigParser {
this.isStartWithSavePoint = isStartWithSavePoint;
this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
- this.fallbackParser =
- new JobConfigParser(idGenerator, commonPluginJars, this,
isStartWithSavePoint);
this.pipelineCheckpoints = pipelineCheckpoints;
}
@@ -184,8 +176,6 @@ public class MultipleTableJobConfigParser {
this.isStartWithSavePoint = isStartWithSavePoint;
this.seaTunnelJobConfig = seaTunnelJobConfig;
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
- this.fallbackParser =
- new JobConfigParser(idGenerator, commonPluginJars, this,
isStartWithSavePoint);
this.pipelineCheckpoints = pipelineCheckpoints;
}
@@ -347,31 +337,6 @@ public class MultipleTableJobConfigParser {
log.info("add common jar in plugins :{}", commonPluginJars);
}
- private static <T extends Factory> boolean isFallback(
- ClassLoader classLoader,
- Class<T> factoryClass,
- String factoryId,
- Consumer<T> virtualCreator) {
- Optional<T> factory =
- FactoryUtil.discoverOptionalFactory(classLoader, factoryClass,
factoryId);
- if (!factory.isPresent()) {
- return true;
- }
- try {
- virtualCreator.accept(factory.get());
- } catch (Exception e) {
- if (e instanceof UnsupportedOperationException
- && "The Factory has not been implemented and the
deprecated Plugin will be used."
- .equals(e.getMessage())) {
- log.warn(
- "The Factory has not been implemented and the
deprecated Plugin will be used.");
- return true;
- }
- log.debug(ExceptionUtils.getMessage(e));
- }
- return false;
- }
-
private int getParallelism(ReadonlyConfig config) {
return Math.max(
1,
@@ -388,18 +353,12 @@ public class MultipleTableJobConfigParser {
final int parallelism = getParallelism(readonlyConfig);
- boolean fallback =
- isFallback(
- classLoader,
- TableSourceFactory.class,
- factoryId,
- (factory) -> factory.createSource(null));
-
- if (fallback) {
- Tuple2<CatalogTable, Action> tuple =
- fallbackParser.parseSource(sourceConfig, jobConfig,
tableId, parallelism);
- return new Tuple2<>(tableId, Collections.singletonList(tuple));
- }
+ Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource =
+ pluginIdentifier -> {
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
+ new SeaTunnelSourcePluginDiscovery();
+ return
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
+ };
Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> tuple2;
if (isStartWithSavePoint && pipelineCheckpoints != null &&
!pipelineCheckpoints.isEmpty()) {
@@ -407,9 +366,16 @@ public class MultipleTableJobConfigParser {
getSourceCheckpoint(configIndex, factoryId);
tuple2 =
FactoryUtil.restoreAndPrepareSource(
- readonlyConfig, classLoader, factoryId,
checkpoint);
+ readonlyConfig,
+ classLoader,
+ factoryId,
+ checkpoint,
+ fallbackCreateSource,
+ null);
} else {
- tuple2 = FactoryUtil.createAndPrepareSource(readonlyConfig,
classLoader, factoryId);
+ tuple2 =
+ FactoryUtil.createAndPrepareSource(
+ readonlyConfig, classLoader, factoryId,
fallbackCreateSource, null);
}
Set<URL> factoryUrls = new HashSet<>();
@@ -591,16 +557,6 @@ public class MultipleTableJobConfigParser {
}
}
- boolean fallback =
- isFallback(
- classLoader,
- TableSinkFactory.class,
- factoryId,
- (factory) -> factory.createSink(null));
- if (fallback) {
- return fallbackParser.parseSinks(configIndex, inputVertices,
sinkConfig, jobConfig);
- }
-
// get jar urls
Set<URL> jarUrls = new HashSet<>();
jarUrls.addAll(getSinkPluginJarPaths(sinkConfig));
@@ -702,9 +658,22 @@ public class MultipleTableJobConfigParser {
String factoryId,
int parallelism,
int configIndex) {
+
+ Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+ pluginIdentifier -> {
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
+ new SeaTunnelSinkPluginDiscovery();
+ return
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+ };
+
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(
- catalogTable, readonlyConfig, classLoader, factoryId);
+ catalogTable,
+ readonlyConfig,
+ classLoader,
+ factoryId,
+ fallbackCreateSink,
+ null);
sink.setJobContext(jobConfig.getJobContext());
SinkConfig actionConfig = new
SinkConfig(catalogTable.getTableId().toTablePath());
long id = idGenerator.getNextId();
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 4b62895f18..a946700c75 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index e765822af3..0d631745eb 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.plugin.discovery;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
index 9fe8717488..a536f3a446 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.plugin.discovery.seatunnel;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.commons.lang3.StringUtils;
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
index cef4f42ab5..145b0a04b4 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.plugin.discovery.seatunnel;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -24,7 +25,6 @@ import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.commons.lang3.tuple.ImmutableTriple;
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
index e1fbce74bb..a224066f2d 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.plugin.discovery.seatunnel;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -24,7 +25,6 @@ import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.commons.lang3.tuple.ImmutableTriple;
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
index 606cd0d7ca..88b586d137 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.plugin.discovery.seatunnel;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
@@ -24,7 +25,6 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.commons.lang3.tuple.ImmutableTriple;
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
index e4cffe8780..5e8f4001aa 100644
---
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.plugin.discovery;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.PluginType;
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
index 1bc62981e8..e8511d524b 100644
---
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -19,10 +19,10 @@ package org.apache.seatunnel.plugin.discovery.seatunnel;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;