This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f083a86c0f [Improve] Refactor Spark/Flink execution processor (#5595)
f083a86c0f is described below
commit f083a86c0f6a2fe14ce52de1baec059d27ddbcf3
Author: Jia Fan <[email protected]>
AuthorDate: Mon Oct 16 10:44:37 2023 +0800
[Improve] Refactor Spark/Flink execution processor (#5595)
---
.../api/table/catalog/CatalogTableUtil.java | 19 +++
.../seatunnel/api/table/factory/FactoryUtil.java | 19 +--
.../core/starter/execution/PluginUtil.java | 176 +++++++++++++++++++++
.../core/starter/execution/SourceTableInfo.java | 35 ++++
.../flink/execution/SinkExecuteProcessor.java | 133 ++++++++++------
.../flink/execution/DataStreamTableInfo.java | 37 +++++
.../FlinkAbstractPluginExecuteProcessor.java | 28 +++-
.../starter/flink/execution/FlinkExecution.java | 10 +-
.../flink/execution/SinkExecuteProcessor.java | 135 ++++++++++------
.../flink/execution/SourceExecuteProcessor.java | 59 ++++---
.../flink/execution/TransformExecuteProcessor.java | 95 +++++------
.../spark/execution/SinkExecuteProcessor.java | 110 +++++++++----
.../starter/spark/execution/DatasetTableInfo.java | 37 +++++
.../spark/execution/SinkExecuteProcessor.java | 110 +++++++++----
.../spark/execution/SourceExecuteProcessor.java | 49 ++++--
.../SparkAbstractPluginExecuteProcessor.java | 32 +++-
.../starter/spark/execution/SparkExecution.java | 11 +-
.../spark/execution/TransformExecuteProcessor.java | 76 +++++----
.../engine/core/parse/ConfigParserUtil.java | 2 +-
.../engine/core/parse/JobConfigParser.java | 6 +-
.../core/parse/MultipleTableJobConfigParser.java | 22 +--
.../plugin/discovery/AbstractPluginDiscovery.java | 27 +++-
.../plugin/discovery/PluginDiscovery.java | 19 +++
.../seatunnel/SeaTunnelFactoryDiscovery.java | 67 ++++++++
24 files changed, 959 insertions(+), 355 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 557759ef3e..880d97ca46 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -190,6 +190,25 @@ public class CatalogTableUtil implements Serializable {
}
}
+ public static List<CatalogTable> convertDataTypeToCatalogTables(
+ SeaTunnelDataType<?> seaTunnelDataType, String tableId) {
+ List<CatalogTable> catalogTables;
+ if (seaTunnelDataType instanceof MultipleRowType) {
+ catalogTables = new ArrayList<>();
+ for (String id : ((MultipleRowType)
seaTunnelDataType).getTableIds()) {
+ catalogTables.add(
+ CatalogTableUtil.getCatalogTable(
+ id, ((MultipleRowType)
seaTunnelDataType).getRowType(id)));
+ }
+ } else {
+ catalogTables =
+ Collections.singletonList(
+ CatalogTableUtil.getCatalogTable(
+ tableId, (SeaTunnelRowType)
seaTunnelDataType));
+ }
+ return catalogTables;
+ }
+
public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
if (readonlyConfig.get(TableSchemaOptions.SCHEMA) == null) {
throw new RuntimeException(
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 48ed785c39..9892e1b8bc 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -32,9 +32,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSource;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.slf4j.Logger;
@@ -62,7 +60,7 @@ public final class FactoryUtil {
private static final Logger LOG =
LoggerFactory.getLogger(FactoryUtil.class);
- static final String DEFAULT_ID = "default-identifier";
+ public static final String DEFAULT_ID = "default-identifier";
public static <T, SplitT extends SourceSplit, StateT extends Serializable>
List<Tuple2<SeaTunnelSource<T, SplitT, StateT>,
List<CatalogTable>>>
@@ -86,19 +84,8 @@ public final class FactoryUtil {
SeaTunnelDataType<T> seaTunnelDataType =
source.getProducedType();
final String tableId =
options.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
- if (seaTunnelDataType instanceof MultipleRowType) {
- catalogTables = new ArrayList<>();
- for (String id : ((MultipleRowType)
seaTunnelDataType).getTableIds()) {
- catalogTables.add(
- CatalogTableUtil.getCatalogTable(
- id, ((MultipleRowType)
seaTunnelDataType).getRowType(id)));
- }
- } else {
- catalogTables =
- Collections.singletonList(
- CatalogTableUtil.getCatalogTable(
- tableId, (SeaTunnelRowType)
seaTunnelDataType));
- }
+ catalogTables =
+
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
}
LOG.info(
"get the CatalogTable from source {}: {}",
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
new file mode 100644
index 0000000000..0b1f34d3c1
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.execution;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryException;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
+
+import com.google.common.collect.Lists;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+
+/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
+public class PluginUtil {
+
+ protected static final String ENGINE_TYPE = "seatunnel";
+
+ public static SourceTableInfo createSource(
+ SeaTunnelFactoryDiscovery factoryDiscovery,
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery,
+ PluginIdentifier pluginIdentifier,
+ Config pluginConfig,
+ JobContext jobContext) {
+ // get current thread classloader
+ ClassLoader classLoader =
+ Thread.currentThread()
+ .getContextClassLoader(); // try to find factory of
this plugin
+
+ final ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
+ // try to find table source factory
+ final Optional<Factory> sourceFactory =
+
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
+ final boolean fallback = isFallback(sourceFactory);
+ SeaTunnelSource source;
+ if (fallback) {
+ source = fallbackCreate(sourcePluginDiscovery, pluginIdentifier,
pluginConfig);
+ } else {
+ // create source with source factory
+ TableSourceFactoryContext context =
+ new TableSourceFactoryContext(readonlyConfig, classLoader);
+
ConfigValidator.of(context.getOptions()).validate(sourceFactory.get().optionRule());
+ TableSource tableSource =
+ ((TableSourceFactory)
sourceFactory.get()).createSource(context);
+ source = tableSource.createSource();
+ }
+ source.setJobContext(jobContext);
+ ensureJobModeMatch(jobContext, source);
+ List<CatalogTable> catalogTables;
+ try {
+ catalogTables = source.getProducedCatalogTables();
+ } catch (UnsupportedOperationException e) {
+ // TODO remove it when all connector use `getProducedCatalogTables`
+ SeaTunnelDataType<?> seaTunnelDataType = source.getProducedType();
+ final String tableId =
+
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+ catalogTables =
+
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
+ }
+
+ if (catalogTables.size() != 1) {
+ throw new SeaTunnelException(
+ String.format("Unsupported table number: %d on flink",
catalogTables.size()));
+ }
+ return new SourceTableInfo(source, catalogTables);
+ }
+
+ private static boolean isFallback(Optional<Factory> factory) {
+ if (!factory.isPresent()) {
+ return true;
+ }
+ try {
+ ((TableSourceFactory) factory.get()).createSource(null);
+ } catch (Exception e) {
+ if (e instanceof UnsupportedOperationException
+ && "The Factory has not been implemented and the
deprecated Plugin will be used."
+ .equals(e.getMessage())) {
+ return true;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private static SeaTunnelSource fallbackCreate(
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery,
+ PluginIdentifier pluginIdentifier,
+ Config pluginConfig) {
+ SeaTunnelSource source =
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
+ source.prepare(pluginConfig);
+ return source;
+ }
+
+ public static TableTransformFactory createTransformFactory(
+ SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
+ Config transformConfig,
+ List<URL> pluginJars) {
+ PluginIdentifier pluginIdentifier =
+ PluginIdentifier.of(
+ ENGINE_TYPE, "transform",
transformConfig.getString(PLUGIN_NAME.key()));
+ final ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(transformConfig);
+ final String factoryId = readonlyConfig.get(PLUGIN_NAME);
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ final TableTransformFactory factory =
+ discoverFactory(classLoader, TableTransformFactory.class,
factoryId);
+ pluginJars.addAll(
+
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ return factory;
+ }
+
+ public static Optional<? extends Factory> createSinkFactory(
+ SeaTunnelFactoryDiscovery factoryDiscovery,
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+ Config sinkConfig,
+ List<URL> pluginJars) {
+ PluginIdentifier pluginIdentifier =
+ PluginIdentifier.of(ENGINE_TYPE, "sink",
sinkConfig.getString(PLUGIN_NAME.key()));
+ pluginJars.addAll(
+
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ try {
+ return
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
+ } catch (FactoryException e) {
+ return Optional.empty();
+ }
+ }
+
+ public static void ensureJobModeMatch(JobContext jobContext,
SeaTunnelSource source) {
+ if (jobContext.getJobMode() == JobMode.BATCH
+ && source.getBoundedness()
+ ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "'%s' source don't support off-line job.",
source.getPluginName()));
+ }
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
new file mode 100644
index 0000000000..529b9b4207
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.execution;
+
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class SourceTableInfo {
+
+ private SeaTunnelSource source;
+
+ private List<CatalogTable> catalogTables;
+}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index cf0ad1e7be..8ef78a9762 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,32 +21,35 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.types.Row;
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
import java.net.URL;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
public class SinkExecuteProcessor
- extends FlinkAbstractPluginExecuteProcessor<
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> {
+ extends FlinkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
private static final String PLUGIN_TYPE = PluginType.SINK.getType();
@@ -56,60 +59,67 @@ public class SinkExecuteProcessor
}
@Override
- protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>>
- initializePlugins(List<URL> jarPaths, List<? extends Config>
pluginConfigs) {
+ protected List<Optional<? extends Factory>> initializePlugins(
+ List<URL> jarPaths, List<? extends Config> pluginConfigs) {
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableSinkFactory.class,
ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
- List<URL> pluginJars = new ArrayList<>();
- List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> sinks =
- pluginConfigs.stream()
- .map(
- sinkConfig -> {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
sinkConfig.getString(PLUGIN_NAME));
- pluginJars.addAll(
-
sinkPluginDiscovery.getPluginJarPaths(
-
Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSink<
- SeaTunnelRow,
- Serializable,
- Serializable,
- Serializable>
- seaTunnelSink =
-
sinkPluginDiscovery.createPluginInstance(
- pluginIdentifier);
- seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setJobContext(jobContext);
- return seaTunnelSink;
- })
- .distinct()
- .collect(Collectors.toList());
- jarPaths.addAll(pluginJars);
- return sinks;
+ return pluginConfigs.stream()
+ .map(
+ sinkConfig ->
+ PluginUtil.createSinkFactory(
+ factoryDiscovery,
+ sinkPluginDiscovery,
+ sinkConfig,
+ jarPaths))
+ .distinct()
+ .collect(Collectors.toList());
}
@Override
- public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams)
+ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo>
upstreamDataStreams)
throws TaskExecuteException {
- DataStream<Row> input = upstreamDataStreams.get(0);
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
+ new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
+ DataStreamTableInfo input = upstreamDataStreams.get(0);
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink =
- plugins.get(i);
- DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
- SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
- seaTunnelSink.setTypeInfo(sourceType);
- if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
+ DataStreamTableInfo stream =
+ fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
+ Optional<? extends Factory> factory = plugins.get(i);
+ boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
+ SeaTunnelSink sink;
+ if (fallBack) {
+ sink =
+ fallbackCreateSink(
+ sinkPluginDiscovery,
+ PluginIdentifier.of(
+ ENGINE_TYPE,
+ PLUGIN_TYPE,
+
sinkConfig.getString(PLUGIN_NAME.key())),
+ sinkConfig);
+ sink.setJobContext(jobContext);
+ SeaTunnelRowType sourceType = initSourceType(sinkConfig,
stream.getDataStream());
+ sink.setTypeInfo(sourceType);
+ } else {
+ TableSinkFactoryContext context =
+ new TableSinkFactoryContext(
+ stream.getCatalogTable(),
+ ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+ sink = ((TableSinkFactory)
factory.get()).createSink(context).createSink();
+ sink.setJobContext(jobContext);
+ }
+ if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+ SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
DataSaveMode dataSaveMode =
saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
DataStreamSink<Row> dataStreamSink =
- stream.sinkTo(new FlinkSink<>(seaTunnelSink))
- .name(seaTunnelSink.getPluginName());
+ stream.getDataStream().sinkTo(new
FlinkSink<>(sink)).name(sink.getPluginName());
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
@@ -118,4 +128,27 @@ public class SinkExecuteProcessor
// the sink is the last stream
return null;
}
+
+ public boolean isFallback(Factory factory) {
+ try {
+ ((TableSinkFactory) factory).createSink(null);
+ } catch (Exception e) {
+ if (e instanceof UnsupportedOperationException
+ && "The Factory has not been implemented and the
deprecated Plugin will be used."
+ .equals(e.getMessage())) {
+ return true;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public SeaTunnelSink fallbackCreateSink(
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+ PluginIdentifier pluginIdentifier,
+ Config pluginConfig) {
+ SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+ source.prepare(pluginConfig);
+ return source;
+ }
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
new file mode 100644
index 0000000000..7b158ee60b
--- /dev/null
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.execution;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class DataStreamTableInfo {
+
+ private DataStream<Row> dataStream;
+
+ private CatalogTable catalogTable;
+
+ private String tableName;
+}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index ed8b72a8f0..44b2f914fc 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
@@ -41,9 +42,9 @@ import java.util.function.BiConsumer;
import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
public abstract class FlinkAbstractPluginExecuteProcessor<T>
- implements PluginExecuteProcessor<DataStream<Row>,
FlinkRuntimeEnvironment> {
+ implements PluginExecuteProcessor<DataStreamTableInfo,
FlinkRuntimeEnvironment> {
protected static final String ENGINE_TYPE = "seatunnel";
- protected static final String PLUGIN_NAME = "plugin_name";
+ protected static final String PLUGIN_NAME_KEY = "plugin_name";
protected static final String SOURCE_TABLE_NAME = "source_table_name";
protected static HashMap<String, Boolean> isAppendMap = new HashMap<>();
@@ -78,15 +79,30 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
this.flinkRuntimeEnvironment = flinkRuntimeEnvironment;
}
- protected Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
+ protected Optional<DataStreamTableInfo> fromSourceTable(
+ Config pluginConfig, List<DataStreamTableInfo>
upstreamDataStreams) {
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
StreamTableEnvironment tableEnvironment =
flinkRuntimeEnvironment.getStreamTableEnvironment();
String tableName = pluginConfig.getString(SOURCE_TABLE_NAME);
Table table = tableEnvironment.from(tableName);
- return Optional.ofNullable(
- TableUtil.tableToDataStream(
- tableEnvironment, table,
isAppendMap.getOrDefault(tableName, true)));
+ DataStreamTableInfo dataStreamTableInfo =
+ upstreamDataStreams.stream()
+ .filter(info ->
tableName.equals(info.getTableName()))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new SeaTunnelException(
+ String.format(
+ "table %s not
found", tableName)));
+ return Optional.of(
+ new DataStreamTableInfo(
+ TableUtil.tableToDataStream(
+ tableEnvironment,
+ table,
+ isAppendMap.getOrDefault(tableName, true)),
+ dataStreamTableInfo.getCatalogTable(),
+ tableName));
}
return Optional.empty();
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 7c56cd68b1..3e079db3a3 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -35,8 +35,6 @@ import
org.apache.seatunnel.core.starter.execution.TaskExecution;
import org.apache.seatunnel.core.starter.flink.FlinkStarter;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
import lombok.extern.slf4j.Slf4j;
@@ -56,11 +54,11 @@ import java.util.stream.Stream;
@Slf4j
public class FlinkExecution implements TaskExecution {
private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
- private final PluginExecuteProcessor<DataStream<Row>,
FlinkRuntimeEnvironment>
+ private final PluginExecuteProcessor<DataStreamTableInfo,
FlinkRuntimeEnvironment>
sourcePluginExecuteProcessor;
- private final PluginExecuteProcessor<DataStream<Row>,
FlinkRuntimeEnvironment>
+ private final PluginExecuteProcessor<DataStreamTableInfo,
FlinkRuntimeEnvironment>
transformPluginExecuteProcessor;
- private final PluginExecuteProcessor<DataStream<Row>,
FlinkRuntimeEnvironment>
+ private final PluginExecuteProcessor<DataStreamTableInfo,
FlinkRuntimeEnvironment>
sinkPluginExecuteProcessor;
private final List<URL> jarPaths;
@@ -104,7 +102,7 @@ public class FlinkExecution implements TaskExecution {
@Override
public void execute() throws TaskExecuteException {
- List<DataStream<Row>> dataStreams = new ArrayList<>();
+ List<DataStreamTableInfo> dataStreams = new ArrayList<>();
dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
sinkPluginExecuteProcessor.execute(dataStreams);
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 340351d1d4..2e6b742d67 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,33 +21,36 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
import org.apache.flink.types.Row;
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
import java.net.URL;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
public class SinkExecuteProcessor
- extends FlinkAbstractPluginExecuteProcessor<
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> {
+ extends FlinkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
private static final String PLUGIN_TYPE = PluginType.SINK.getType();
@@ -57,60 +60,69 @@ public class SinkExecuteProcessor
}
@Override
- protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>>
- initializePlugins(List<URL> jarPaths, List<? extends Config>
pluginConfigs) {
+ protected List<Optional<? extends Factory>> initializePlugins(
+ List<URL> jarPaths, List<? extends Config> pluginConfigs) {
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableSinkFactory.class,
ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
- List<URL> pluginJars = new ArrayList<>();
- List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> sinks =
- pluginConfigs.stream()
- .map(
- sinkConfig -> {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
sinkConfig.getString(PLUGIN_NAME));
- pluginJars.addAll(
-
sinkPluginDiscovery.getPluginJarPaths(
-
Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSink<
- SeaTunnelRow,
- Serializable,
- Serializable,
- Serializable>
- seaTunnelSink =
-
sinkPluginDiscovery.createPluginInstance(
- pluginIdentifier);
- seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setJobContext(jobContext);
- return seaTunnelSink;
- })
- .distinct()
- .collect(Collectors.toList());
- jarPaths.addAll(pluginJars);
- return sinks;
+ return pluginConfigs.stream()
+ .map(
+ sinkConfig ->
+ PluginUtil.createSinkFactory(
+ factoryDiscovery,
+ sinkPluginDiscovery,
+ sinkConfig,
+ jarPaths))
+ .distinct()
+ .collect(Collectors.toList());
}
@Override
- public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams)
+ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo>
upstreamDataStreams)
throws TaskExecuteException {
- DataStream<Row> input = upstreamDataStreams.get(0);
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
+ new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
+ DataStreamTableInfo input = upstreamDataStreams.get(0);
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink =
- plugins.get(i);
- DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
- SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
- seaTunnelSink.setTypeInfo(sourceType);
- if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
+ DataStreamTableInfo stream =
+ fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
+ Optional<? extends Factory> factory = plugins.get(i);
+ boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
+ SeaTunnelSink sink;
+ if (fallBack) {
+ sink =
+ fallbackCreateSink(
+ sinkPluginDiscovery,
+ PluginIdentifier.of(
+ ENGINE_TYPE,
+ PLUGIN_TYPE,
+
sinkConfig.getString(PLUGIN_NAME.key())),
+ sinkConfig);
+ sink.setJobContext(jobContext);
+ SeaTunnelRowType sourceType = initSourceType(sinkConfig,
stream.getDataStream());
+ sink.setTypeInfo(sourceType);
+ } else {
+ TableSinkFactoryContext context =
+ new TableSinkFactoryContext(
+ stream.getCatalogTable(),
+ ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+ sink = ((TableSinkFactory)
factory.get()).createSink(context).createSink();
+ sink.setJobContext(jobContext);
+ }
+ if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+ SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
DataSaveMode dataSaveMode =
saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
DataStreamSink<Row> dataStreamSink =
- stream.sinkTo(SinkV1Adapter.wrap(new
FlinkSink<>(seaTunnelSink)))
- .name(seaTunnelSink.getPluginName());
+ stream.getDataStream()
+ .sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(sink)))
+ .name(sink.getPluginName());
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
@@ -119,4 +131,27 @@ public class SinkExecuteProcessor
// the sink is the last stream
return null;
}
+
+ public boolean isFallback(Factory factory) {
+ try {
+ ((TableSinkFactory) factory).createSink(null);
+ } catch (Exception e) {
+ if (e instanceof UnsupportedOperationException
+ && "The Factory has not been implemented and the
deprecated Plugin will be used."
+ .equals(e.getMessage())) {
+ return true;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public SeaTunnelSink fallbackCreateSink(
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+ PluginIdentifier pluginIdentifier,
+ Config pluginConfig) {
+ SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+ source.prepare(pluginConfig);
+ return source;
+ }
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 2f5e3a63ba..5636d231c5 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -23,11 +23,14 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.enums.PluginType;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
+import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import
org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction;
import
org.apache.seatunnel.translation.flink.source.SeaTunnelCoordinatedSource;
@@ -35,7 +38,6 @@ import
org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -43,6 +45,7 @@ import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.types.Row;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.ArrayList;
@@ -51,8 +54,12 @@ import java.util.List;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
-public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<SeaTunnelSource> {
+@Slf4j
+@SuppressWarnings("unchecked,rawtypes")
+public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<SourceTableInfo> {
private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
private Config envConfigs;
@@ -62,12 +69,13 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
}
@Override
- public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams) {
+ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo>
upstreamDataStreams) {
StreamExecutionEnvironment executionEnvironment =
flinkRuntimeEnvironment.getStreamExecutionEnvironment();
- List<DataStream<Row>> sources = new ArrayList<>();
+ List<DataStreamTableInfo> sources = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
- SeaTunnelSource internalSource = plugins.get(i);
+ SourceTableInfo sourceTableInfo = plugins.get(i);
+ SeaTunnelSource internalSource = sourceTableInfo.getSource();
Config pluginConfig = pluginConfigs.get(i);
BaseSeaTunnelSourceFunction sourceFunction;
if (internalSource instanceof SupportCoordinate) {
@@ -94,7 +102,13 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
sourceStream.setParallelism(parallelism);
}
registerResultTable(pluginConfig, sourceStream);
- sources.add(sourceStream);
+ sources.add(
+ new DataStreamTableInfo(
+ sourceStream,
+ sourceTableInfo.getCatalogTables().get(0),
+ pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+ ?
pluginConfig.getString(RESULT_TABLE_NAME.key())
+ : null));
}
return sources;
}
@@ -125,31 +139,30 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
}
@Override
- protected List<SeaTunnelSource> initializePlugins(
+ protected List<SourceTableInfo> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
- List<SeaTunnelSource> sources = new ArrayList<>();
+
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableSourceFactory.class,
ADD_URL_TO_CLASSLOADER);
+
+ List<SourceTableInfo> sources = new ArrayList<>();
Set<URL> jars = new HashSet<>();
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(
- ENGINE_TYPE, PLUGIN_TYPE,
sourceConfig.getString(PLUGIN_NAME));
+ ENGINE_TYPE, PLUGIN_TYPE,
sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSource seaTunnelSource =
-
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
- seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setJobContext(jobContext);
- if (jobContext.getJobMode() == JobMode.BATCH
- && seaTunnelSource.getBoundedness()
- ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
- throw new UnsupportedOperationException(
- String.format(
- "'%s' source don't support off-line job.",
- seaTunnelSource.getPluginName()));
- }
- sources.add(seaTunnelSource);
+ SourceTableInfo source =
+ PluginUtil.createSource(
+ factoryDiscovery,
+ sourcePluginDiscovery,
+ pluginIdentifier,
+ sourceConfig,
+ jobContext);
+ sources.add(source);
}
jarPaths.addAll(jars);
return sources;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 0dc36c62b0..810f6ba32b 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -20,11 +20,15 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
@@ -35,17 +39,15 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
-import com.google.common.collect.Lists;
-
import java.net.URL;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
-public class TransformExecuteProcessor
- extends FlinkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
- private static final String PLUGIN_TYPE = "transform";
+public class TransformExecuteProcessor
+ extends FlinkAbstractPluginExecuteProcessor<TableTransformFactory> {
protected TransformExecuteProcessor(
List<URL> jarPaths, List<? extends Config> pluginConfigs,
JobContext jobContext) {
@@ -53,65 +55,64 @@ public class TransformExecuteProcessor
}
@Override
- protected List<SeaTunnelTransform> initializePlugins(
+ protected List<TableTransformFactory> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();
- List<URL> pluginJars = new ArrayList<>();
- List<SeaTunnelTransform> transforms =
- pluginConfigs.stream()
- .map(
- transformConfig -> {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
transformConfig.getString(PLUGIN_NAME));
- List<URL> pluginJarPaths =
-
transformPluginDiscovery.getPluginJarPaths(
-
Lists.newArrayList(pluginIdentifier));
- SeaTunnelTransform<?> seaTunnelTransform =
-
transformPluginDiscovery.createPluginInstance(
- pluginIdentifier);
- jarPaths.addAll(pluginJarPaths);
-
seaTunnelTransform.prepare(transformConfig);
-
seaTunnelTransform.setJobContext(jobContext);
- return seaTunnelTransform;
- })
- .distinct()
- .collect(Collectors.toList());
- jarPaths.addAll(pluginJars);
- return transforms;
+
+ return pluginConfigs.stream()
+ .map(
+ transformConfig ->
+ PluginUtil.createTransformFactory(
+ transformPluginDiscovery,
transformConfig, jarPaths))
+ .distinct()
+ .collect(Collectors.toList());
}
@Override
- public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams)
+ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo>
upstreamDataStreams)
throws TaskExecuteException {
if (plugins.isEmpty()) {
return upstreamDataStreams;
}
- DataStream<Row> input = upstreamDataStreams.get(0);
- List<DataStream<Row>> result = new ArrayList<>();
+ DataStreamTableInfo input = upstreamDataStreams.get(0);
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
for (int i = 0; i < plugins.size(); i++) {
try {
- SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
- DataStream<Row> stream =
fromSourceTable(pluginConfig).orElse(input);
- SeaTunnelRowType sourceType = initSourceType(pluginConfig,
stream);
- transform.setTypeInfo(sourceType);
- input = flinkTransform(sourceType, transform, stream);
- stageType(pluginConfig, (SeaTunnelRowType)
transform.getProducedType());
- registerResultTable(pluginConfig, input);
- result.add(input);
+ DataStreamTableInfo stream =
+ fromSourceTable(pluginConfig,
upstreamDataStreams).orElse(input);
+ TableTransformFactory factory = plugins.get(i);
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
+
Collections.singletonList(stream.getCatalogTable()),
+ ReadonlyConfig.fromConfig(pluginConfig),
+ classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
+ SeaTunnelTransform transform =
factory.createTransform(context).createTransform();
+
+ SeaTunnelRowType sourceType = initSourceType(pluginConfig,
stream.getDataStream());
+ transform.setJobContext(jobContext);
+ DataStream<Row> inputStream =
+ flinkTransform(sourceType, transform,
stream.getDataStream());
+ stageType(pluginConfig,
transform.getProducedCatalogTable().getSeaTunnelRowType());
+ registerResultTable(pluginConfig, inputStream);
+ upstreamDataStreams.add(
+ new DataStreamTableInfo(
+ inputStream,
+ transform.getProducedCatalogTable(),
+ pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+ ?
pluginConfig.getString(RESULT_TABLE_NAME.key())
+ : null));
} catch (Exception e) {
throw new TaskExecuteException(
String.format(
"SeaTunnel transform task: %s execute error",
- plugins.get(i).getPluginName()),
+ plugins.get(i).factoryIdentifier()),
e);
}
}
- return result;
+ return upstreamDataStreams;
}
protected DataStream<Row> flinkTransform(
@@ -119,7 +120,7 @@ public class TransformExecuteProcessor
TypeInformation rowTypeInfo =
TypeConverterUtils.convert(transform.getProducedType());
FlinkRowConverter transformInputRowConverter = new
FlinkRowConverter(sourceType);
FlinkRowConverter transformOutputRowConverter =
- new FlinkRowConverter(transform.getProducedType());
+ new
FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType());
DataStream<Row> output =
stream.flatMap(
new FlatMapFunction<Row, Row>() {
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 503f76b87a..0e3b18fb7d 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -21,13 +21,20 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -35,15 +42,16 @@ import
org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import com.google.common.collect.Lists;
-
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
public class SinkExecuteProcessor
- extends SparkAbstractPluginExecuteProcessor<SeaTunnelSink<?, ?, ?, ?>>
{
+ extends SparkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
private static final String PLUGIN_TYPE = PluginType.SINK.getType();
protected SinkExecuteProcessor(
@@ -54,29 +62,21 @@ public class SinkExecuteProcessor
}
@Override
- protected List<SeaTunnelSink<?, ?, ?, ?>> initializePlugins(
+ protected List<Optional<? extends Factory>> initializePlugins(
List<? extends Config> pluginConfigs) {
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
- List<SeaTunnelSink<?, ?, ?, ?>> sinks =
+ List<Optional<? extends Factory>> sinks =
pluginConfigs.stream()
.map(
- sinkConfig -> {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
sinkConfig.getString(PLUGIN_NAME));
- pluginJars.addAll(
-
sinkPluginDiscovery.getPluginJarPaths(
-
Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSink<?, ?, ?, ?> seaTunnelSink =
-
sinkPluginDiscovery.createPluginInstance(
- pluginIdentifier);
- seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setJobContext(jobContext);
- return seaTunnelSink;
- })
+ sinkConfig ->
+ PluginUtil.createSinkFactory(
+ factoryDiscovery,
+ sinkPluginDiscovery,
+ sinkConfig,
+ pluginJars))
.distinct()
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -84,14 +84,17 @@ public class SinkExecuteProcessor
}
@Override
- public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
+ public List<DatasetTableInfo> execute(List<DatasetTableInfo>
upstreamDataStreams)
throws TaskExecuteException {
- Dataset<Row> input = upstreamDataStreams.get(0);
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ DatasetTableInfo input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
- SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
- Dataset<Row> dataset =
- fromSourceTable(sinkConfig,
sparkRuntimeEnvironment).orElse(input);
+ DatasetTableInfo datasetTableInfo =
+ fromSourceTable(sinkConfig, sparkRuntimeEnvironment,
upstreamDataStreams)
+ .orElse(input);
+ Dataset<Row> dataset = datasetTableInfo.getDataset();
int parallelism;
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
@@ -104,19 +107,64 @@ public class SinkExecuteProcessor
CommonOptions.PARALLELISM.defaultValue());
}
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(),
parallelism);
+ Optional<? extends Factory> factory = plugins.get(i);
+ boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
+ SeaTunnelSink sink;
+ if (fallBack) {
+ sink =
+ fallbackCreateSink(
+ sinkPluginDiscovery,
+ PluginIdentifier.of(
+ ENGINE_TYPE,
+ PLUGIN_TYPE,
+
sinkConfig.getString(PLUGIN_NAME.key())),
+ sinkConfig);
+ sink.setJobContext(jobContext);
+ sink.setTypeInfo((SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
+ } else {
+ TableSinkFactoryContext context =
+ new TableSinkFactoryContext(
+ datasetTableInfo.getCatalogTable(),
+ ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+ sink = ((TableSinkFactory)
factory.get()).createSink(context).createSink();
+ sink.setJobContext(jobContext);
+ }
// TODO modify checkpoint location
- seaTunnelSink.setTypeInfo(
- (SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
- if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
+ if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+ SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
DataSaveMode dataSaveMode =
saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
- SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
+ SparkSinkInjector.inject(dataset.write(), sink)
.option("checkpointLocation", "/tmp")
.save();
}
// the sink is the last stream
return null;
}
+
+ public boolean isFallback(Factory factory) {
+ try {
+ ((TableSinkFactory) factory).createSink(null);
+ } catch (Exception e) {
+ if (e instanceof UnsupportedOperationException
+ && "The Factory has not been implemented and the
deprecated Plugin will be used."
+ .equals(e.getMessage())) {
+ return true;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public SeaTunnelSink fallbackCreateSink(
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+ PluginIdentifier pluginIdentifier,
+ Config pluginConfig) {
+ SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+ source.prepare(pluginConfig);
+ return source;
+ }
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/DatasetTableInfo.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/DatasetTableInfo.java
new file mode 100644
index 0000000000..3af8a5ad6d
--- /dev/null
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/DatasetTableInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.spark.execution;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class DatasetTableInfo {
+
+ private Dataset<Row> dataset;
+
+ private CatalogTable catalogTable;
+
+ private String tableName;
+}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 8afffe1add..c85a10389d 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -21,13 +21,20 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -36,15 +43,16 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
-import com.google.common.collect.Lists;
-
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
public class SinkExecuteProcessor
- extends SparkAbstractPluginExecuteProcessor<SeaTunnelSink<?, ?, ?, ?>>
{
+ extends SparkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
private static final String PLUGIN_TYPE = PluginType.SINK.getType();
protected SinkExecuteProcessor(
@@ -55,29 +63,21 @@ public class SinkExecuteProcessor
}
@Override
- protected List<SeaTunnelSink<?, ?, ?, ?>> initializePlugins(
+ protected List<Optional<? extends Factory>> initializePlugins(
List<? extends Config> pluginConfigs) {
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
- List<SeaTunnelSink<?, ?, ?, ?>> sinks =
+ List<Optional<? extends Factory>> sinks =
pluginConfigs.stream()
.map(
- sinkConfig -> {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
sinkConfig.getString(PLUGIN_NAME));
- pluginJars.addAll(
-
sinkPluginDiscovery.getPluginJarPaths(
-
Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSink<?, ?, ?, ?> seaTunnelSink =
-
sinkPluginDiscovery.createPluginInstance(
- pluginIdentifier);
- seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setJobContext(jobContext);
- return seaTunnelSink;
- })
+ sinkConfig ->
+ PluginUtil.createSinkFactory(
+ factoryDiscovery,
+ sinkPluginDiscovery,
+ sinkConfig,
+ new ArrayList<>()))
.distinct()
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -85,14 +85,17 @@ public class SinkExecuteProcessor
}
@Override
- public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
+ public List<DatasetTableInfo> execute(List<DatasetTableInfo>
upstreamDataStreams)
throws TaskExecuteException {
- Dataset<Row> input = upstreamDataStreams.get(0);
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ DatasetTableInfo input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
- SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
- Dataset<Row> dataset =
- fromSourceTable(sinkConfig,
sparkRuntimeEnvironment).orElse(input);
+ DatasetTableInfo datasetTableInfo =
+ fromSourceTable(sinkConfig, sparkRuntimeEnvironment,
upstreamDataStreams)
+ .orElse(input);
+ Dataset<Row> dataset = datasetTableInfo.getDataset();
int parallelism;
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
@@ -105,15 +108,37 @@ public class SinkExecuteProcessor
CommonOptions.PARALLELISM.defaultValue());
}
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(),
parallelism);
+ Optional<? extends Factory> factory = plugins.get(i);
+ boolean fallBack = !factory.isPresent() ||
isFallback(factory.get());
+ SeaTunnelSink sink;
+ if (fallBack) {
+ sink =
+ fallbackCreateSink(
+ sinkPluginDiscovery,
+ PluginIdentifier.of(
+ ENGINE_TYPE,
+ PLUGIN_TYPE,
+
sinkConfig.getString(PLUGIN_NAME.key())),
+ sinkConfig);
+ sink.setJobContext(jobContext);
+ sink.setTypeInfo((SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
+ } else {
+ TableSinkFactoryContext context =
+ new TableSinkFactoryContext(
+ datasetTableInfo.getCatalogTable(),
+ ReadonlyConfig.fromConfig(sinkConfig),
+ classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+ sink = ((TableSinkFactory)
factory.get()).createSink(context).createSink();
+ sink.setJobContext(jobContext);
+ }
// TODO modify checkpoint location
- seaTunnelSink.setTypeInfo(
- (SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
- if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
+ if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+ SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
DataSaveMode dataSaveMode =
saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
- SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
+ SparkSinkInjector.inject(dataset.write(), sink)
.option("checkpointLocation", "/tmp")
.mode(SaveMode.Append)
.save();
@@ -121,4 +146,27 @@ public class SinkExecuteProcessor
// the sink is the last stream
return null;
}
+
+ public boolean isFallback(Factory factory) {
+ try {
+ ((TableSinkFactory) factory).createSink(null);
+ } catch (Exception e) {
+ if (e instanceof UnsupportedOperationException
+ && "The Factory has not been implemented and the
deprecated Plugin will be used."
+ .equals(e.getMessage())) {
+ return true;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public SeaTunnelSink fallbackCreateSink(
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+ PluginIdentifier pluginIdentifier,
+ Config pluginConfig) {
+ SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+ source.prepare(pluginConfig);
+ return source;
+ }
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index d68aec3c23..126f14edb1 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -22,9 +22,13 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
+import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -40,8 +44,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-public class SourceExecuteProcessor
- extends SparkAbstractPluginExecuteProcessor<SeaTunnelSource<?, ?, ?>> {
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+
+@SuppressWarnings("rawtypes")
+public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<SourceTableInfo> {
private static final String PLUGIN_TYPE = "source";
public SourceExecuteProcessor(
@@ -52,10 +59,11 @@ public class SourceExecuteProcessor
}
@Override
- public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {
- List<Dataset<Row>> sources = new ArrayList<>();
+ public List<DatasetTableInfo> execute(List<DatasetTableInfo>
upstreamDataStreams) {
+ List<DatasetTableInfo> sources = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
- SeaTunnelSource<?, ?, ?> source = plugins.get(i);
+ SourceTableInfo sourceTableInfo = plugins.get(i);
+ SeaTunnelSource<?, ?, ?> source = sourceTableInfo.getSource();
Config pluginConfig = pluginConfigs.get(i);
int parallelism;
if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
@@ -81,29 +89,40 @@ public class SourceExecuteProcessor
(StructType)
TypeConverterUtils.convert(source.getProducedType()))
.load();
- sources.add(dataset);
+ sources.add(
+ new DatasetTableInfo(
+ dataset,
+ sourceTableInfo.getCatalogTables().get(0),
+ pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+ ?
pluginConfig.getString(RESULT_TABLE_NAME.key())
+ : null));
registerInputTempView(pluginConfigs.get(i), dataset);
}
return sources;
}
@Override
- protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(
- List<? extends Config> pluginConfigs) {
+ protected List<SourceTableInfo> initializePlugins(List<? extends Config>
pluginConfigs) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new
SeaTunnelSourcePluginDiscovery();
- List<SeaTunnelSource<?, ?, ?>> sources = new ArrayList<>();
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableSourceFactory.class);
+
+ List<SourceTableInfo> sources = new ArrayList<>();
Set<URL> jars = new HashSet<>();
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(
- ENGINE_TYPE, PLUGIN_TYPE,
sourceConfig.getString(PLUGIN_NAME));
+ ENGINE_TYPE, PLUGIN_TYPE,
sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSource<?, ?, ?> seaTunnelSource =
-
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
- seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setJobContext(jobContext);
- sources.add(seaTunnelSource);
+ SourceTableInfo source =
+ PluginUtil.createSource(
+ factoryDiscovery,
+ sourcePluginDiscovery,
+ pluginIdentifier,
+ sourceConfig,
+ jobContext);
+ sources.add(source);
}
sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));
return sources;
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
index ebfcaf6e91..a431f23dd1 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.spark.sql.Dataset;
@@ -28,15 +29,15 @@ import org.apache.spark.sql.Row;
import java.util.List;
import java.util.Optional;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+
public abstract class SparkAbstractPluginExecuteProcessor<T>
- implements PluginExecuteProcessor<Dataset<Row>,
SparkRuntimeEnvironment> {
+ implements PluginExecuteProcessor<DatasetTableInfo,
SparkRuntimeEnvironment> {
protected SparkRuntimeEnvironment sparkRuntimeEnvironment;
protected final List<? extends Config> pluginConfigs;
protected final JobContext jobContext;
protected final List<T> plugins;
protected static final String ENGINE_TYPE = "seatunnel";
- protected static final String PLUGIN_NAME = "plugin_name";
- protected static final String RESULT_TABLE_NAME = "result_table_name";
protected static final String SOURCE_TABLE_NAME = "source_table_name";
protected SparkAbstractPluginExecuteProcessor(
@@ -57,19 +58,34 @@ public abstract class SparkAbstractPluginExecuteProcessor<T>
protected abstract List<T> initializePlugins(List<? extends Config>
pluginConfigs);
protected void registerInputTempView(Config pluginConfig, Dataset<Row>
dataStream) {
- if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
- String tableName = pluginConfig.getString(RESULT_TABLE_NAME);
+ if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
+ String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
registerTempView(tableName, dataStream);
}
}
- protected Optional<Dataset<Row>> fromSourceTable(
- Config pluginConfig, SparkRuntimeEnvironment
sparkRuntimeEnvironment) {
+ protected Optional<DatasetTableInfo> fromSourceTable(
+ Config pluginConfig,
+ SparkRuntimeEnvironment sparkRuntimeEnvironment,
+ List<DatasetTableInfo> upstreamDataStreams) {
if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
return Optional.empty();
}
String sourceTableName = pluginConfig.getString(SOURCE_TABLE_NAME);
- return
Optional.of(sparkRuntimeEnvironment.getSparkSession().read().table(sourceTableName));
+ DatasetTableInfo datasetTableInfo =
+ upstreamDataStreams.stream()
+ .filter(info ->
sourceTableName.equals(info.getTableName()))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new SeaTunnelException(
+ String.format(
+ "table %s not found",
sourceTableName)));
+ return Optional.of(
+ new DatasetTableInfo(
+
sparkRuntimeEnvironment.getSparkSession().read().table(sourceTableName),
+ datasetTableInfo.getCatalogTable(),
+ sourceTableName));
}
private void registerTempView(String tableName, Dataset<Row> ds) {
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index dab00cbdb4..48aa9abb2a 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -27,9 +27,6 @@ import
org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.execution.TaskExecution;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
@@ -39,11 +36,11 @@ import java.util.List;
@Slf4j
public class SparkExecution implements TaskExecution {
private final SparkRuntimeEnvironment sparkRuntimeEnvironment;
- private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
+ private final PluginExecuteProcessor<DatasetTableInfo,
SparkRuntimeEnvironment>
sourcePluginExecuteProcessor;
- private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
+ private final PluginExecuteProcessor<DatasetTableInfo,
SparkRuntimeEnvironment>
transformPluginExecuteProcessor;
- private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
+ private final PluginExecuteProcessor<DatasetTableInfo,
SparkRuntimeEnvironment>
sinkPluginExecuteProcessor;
public SparkExecution(Config config) {
@@ -68,7 +65,7 @@ public class SparkExecution implements TaskExecution {
@Override
public void execute() throws TaskExecuteException {
- List<Dataset<Row>> datasets = new ArrayList<>();
+ List<DatasetTableInfo> datasets = new ArrayList<>();
datasets = sourcePluginExecuteProcessor.execute(datasets);
datasets = transformPluginExecuteProcessor.execute(datasets);
sinkPluginExecuteProcessor.execute(datasets);
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index fc9be55925..da76e027db 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -20,11 +20,15 @@ package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import
org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -37,22 +41,22 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
-import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+
@Slf4j
public class TransformExecuteProcessor
- extends SparkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
-
- private static final String PLUGIN_TYPE = "transform";
+ extends SparkAbstractPluginExecuteProcessor<TableTransformFactory> {
protected TransformExecuteProcessor(
SparkRuntimeEnvironment sparkRuntimeEnvironment,
@@ -62,29 +66,18 @@ public class TransformExecuteProcessor
}
@Override
- protected List<SeaTunnelTransform> initializePlugins(List<? extends
Config> pluginConfigs) {
+ protected List<TableTransformFactory> initializePlugins(List<? extends
Config> pluginConfigs) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
- List<SeaTunnelTransform> transforms =
+ List<TableTransformFactory> transforms =
pluginConfigs.stream()
.map(
- transformConfig -> {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
-
transformConfig.getString(PLUGIN_NAME));
- pluginJars.addAll(
-
transformPluginDiscovery.getPluginJarPaths(
-
Lists.newArrayList(pluginIdentifier)));
- SeaTunnelTransform pluginInstance =
-
transformPluginDiscovery.createPluginInstance(
- pluginIdentifier);
- pluginInstance.prepare(transformConfig);
- pluginInstance.setJobContext(jobContext);
- return pluginInstance;
- })
+ transformConfig ->
+ PluginUtil.createTransformFactory(
+ transformPluginDiscovery,
+ transformConfig,
+ pluginJars))
.distinct()
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -92,31 +85,46 @@ public class TransformExecuteProcessor
}
@Override
- public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
+ public List<DatasetTableInfo> execute(List<DatasetTableInfo>
upstreamDataStreams)
throws TaskExecuteException {
if (plugins.isEmpty()) {
return upstreamDataStreams;
}
- Dataset<Row> input = upstreamDataStreams.get(0);
- List<Dataset<Row>> result = new ArrayList<>();
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ DatasetTableInfo input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
try {
- SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
- Dataset<Row> stream =
- fromSourceTable(pluginConfig,
sparkRuntimeEnvironment).orElse(input);
- input = sparkTransform(transform, stream);
- registerInputTempView(pluginConfig, input);
- result.add(input);
+ DatasetTableInfo dataset =
+ fromSourceTable(pluginConfig, sparkRuntimeEnvironment,
upstreamDataStreams)
+ .orElse(input);
+ TableTransformFactory factory = plugins.get(i);
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
+
Collections.singletonList(dataset.getCatalogTable()),
+ ReadonlyConfig.fromConfig(pluginConfig),
+ classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
+ SeaTunnelTransform transform =
factory.createTransform(context).createTransform();
+
+ Dataset<Row> inputDataset = sparkTransform(transform,
dataset.getDataset());
+ registerInputTempView(pluginConfig, inputDataset);
+ upstreamDataStreams.add(
+ new DatasetTableInfo(
+ inputDataset,
+ transform.getProducedCatalogTable(),
+ pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+ ?
pluginConfig.getString(RESULT_TABLE_NAME.key())
+ : null));
} catch (Exception e) {
throw new TaskExecuteException(
String.format(
"SeaTunnel transform task: %s execute error",
- plugins.get(i).getPluginName()),
+ plugins.get(i).factoryIdentifier()),
e);
}
}
- return result;
+ return upstreamDataStreams;
}
private Dataset<Row> sparkTransform(SeaTunnelTransform transform,
Dataset<Row> stream)
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
index a1b8ec87e7..fe1185c960 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
@@ -45,7 +45,7 @@ import static
org.apache.seatunnel.api.common.CommonOptions.FACTORY_ID;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.SOURCE_TABLE_NAME;
-import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.DEFAULT_ID;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
@Slf4j
public final class ConfigParserUtil {
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 46c28b6397..de1449b0b4 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -54,9 +55,8 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.DEFAULT_ID;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
-import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.ensureJobModeMatch;
import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode;
@Data
@@ -84,7 +84,7 @@ public class JobConfigParser {
// old logic: prepare(initialization) -> set job context
source.prepare(config);
source.setJobContext(jobConfig.getJobContext());
- ensureJobModeMatch(jobConfig.getJobContext(), source);
+ PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
String actionName =
createSourceActionName(
0, config.getString(CollectionConstants.PLUGIN_NAME),
getTableName(config));
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index d38b5aacb7..c501cb9a1a 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.engine.core.parse;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.DataSaveMode;
@@ -40,7 +39,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -58,8 +57,6 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
@@ -82,6 +79,7 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
import static
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
import static
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryUrls;
import static
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputIds;
@@ -89,10 +87,6 @@ import static
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputId
@Slf4j
public class MultipleTableJobConfigParser {
- private static final ILogger LOGGER =
Logger.getLogger(MultipleTableJobConfigParser.class);
-
- static final String DEFAULT_ID = "default-identifier";
-
private final IdGenerator idGenerator;
private final JobConfig jobConfig;
@@ -320,7 +314,7 @@ public class MultipleTableJobConfigParser {
JobConfigParser.createSourceActionName(configIndex,
factoryId, tableId);
SeaTunnelSource<Object, SourceSplit, Serializable> source =
tuple2._1();
source.setJobContext(jobConfig.getJobContext());
- ensureJobModeMatch(jobConfig.getJobContext(), source);
+ PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
SourceAction<Object, SourceSplit, Serializable> action =
new SourceAction<>(id, actionName, tuple2._1(),
factoryUrls);
action.setParallelism(parallelism);
@@ -331,16 +325,6 @@ public class MultipleTableJobConfigParser {
return new Tuple2<>(tableId, actions);
}
- public static void ensureJobModeMatch(JobContext jobContext,
SeaTunnelSource source) {
- if (jobContext.getJobMode() == JobMode.BATCH
- && source.getBoundedness()
- ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
- throw new JobDefineCheckException(
- String.format(
- "'%s' source don't support off-line job.",
source.getPluginName()));
- }
- }
-
public void parseTransforms(
List<? extends Config> transformConfigs,
ClassLoader classLoader,
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index a2006f7dc9..5042ecfbf0 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -166,22 +166,24 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
return pluginIdentifiers;
}
- public Path getPluginDir() {
- return pluginDir;
- }
-
@Override
public T createPluginInstance(PluginIdentifier pluginIdentifier) {
return (T) createPluginInstance(pluginIdentifier,
Collections.EMPTY_LIST);
}
@Override
- public T createPluginInstance(PluginIdentifier pluginIdentifier,
Collection<URL> pluginJars) {
+ public Optional<T> createOptionalPluginInstance(PluginIdentifier
pluginIdentifier) {
+ return createOptionalPluginInstance(pluginIdentifier,
Collections.EMPTY_LIST);
+ }
+
+ @Override
+ public Optional<T> createOptionalPluginInstance(
+ PluginIdentifier pluginIdentifier, Collection<URL> pluginJars) {
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
T pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
log.info("Load plugin: {} from classpath", pluginIdentifier);
- return pluginInstance;
+ return Optional.of(pluginInstance);
}
Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
// if the plugin jar not exist in classpath, will load from plugin dir.
@@ -214,9 +216,18 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
pluginIdentifier,
pluginJarPath.get(),
classLoader.getClass().getName());
- return pluginInstance;
+ return Optional.of(pluginInstance);
}
}
+ return Optional.empty();
+ }
+
+ @Override
+ public T createPluginInstance(PluginIdentifier pluginIdentifier,
Collection<URL> pluginJars) {
+ Optional<T> instance = createOptionalPluginInstance(pluginIdentifier,
pluginJars);
+ if (instance.isPresent()) {
+ return instance.get();
+ }
throw new RuntimeException("Plugin " + pluginIdentifier + " not
found.");
}
@@ -286,7 +297,7 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
return plugins;
}
- private T loadPluginInstance(PluginIdentifier pluginIdentifier,
ClassLoader classLoader) {
+ protected T loadPluginInstance(PluginIdentifier pluginIdentifier,
ClassLoader classLoader) {
ServiceLoader<T> serviceLoader =
ServiceLoader.load(getPluginBaseClass(), classLoader);
for (T t : serviceLoader) {
if (t instanceof PluginIdentifierInterface) {
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index 728ae15bab..3d45e719b2 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.plugin.discovery;
import java.net.URL;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
/**
* Plugins discovery interface, used to find plugin. Each plugin type should
have its own
@@ -53,6 +54,24 @@ public interface PluginDiscovery<T> {
*/
T createPluginInstance(PluginIdentifier pluginIdentifier, Collection<URL>
pluginJars);
+ /**
+ * Get plugin instance by plugin identifier.
+ *
+ * @param pluginIdentifier plugin identifier.
+ * @return plugin instance. If not found, return Optional.empty().
+ */
+ Optional<T> createOptionalPluginInstance(PluginIdentifier
pluginIdentifier);
+
+ /**
+ * Get plugin instance by plugin identifier.
+ *
+ * @param pluginIdentifier plugin identifier.
+ * @param pluginJars used to help plugin load
+ * @return plugin instance. If not found, return Optional.empty().
+ */
+ Optional<T> createOptionalPluginInstance(
+ PluginIdentifier pluginIdentifier, Collection<URL> pluginJars);
+
/**
* Get all plugin instances.
*
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
new file mode 100644
index 0000000000..9fe8717488
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.seatunnel;
+
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URL;
+import java.util.ServiceLoader;
+import java.util.function.BiConsumer;
+
+public class SeaTunnelFactoryDiscovery extends
AbstractPluginDiscovery<Factory> {
+
+ private final Class<? extends Factory> factoryClass;
+
+ public SeaTunnelFactoryDiscovery(Class<? extends Factory> factoryClass) {
+ super();
+ this.factoryClass = factoryClass;
+ }
+
+ public SeaTunnelFactoryDiscovery(
+ Class<? extends Factory> factoryClass,
+ BiConsumer<ClassLoader, URL> addURLToClassLoader) {
+ super(addURLToClassLoader);
+ this.factoryClass = factoryClass;
+ }
+
+ @Override
+ protected Class<Factory> getPluginBaseClass() {
+ return Factory.class;
+ }
+
+ @Override
+ protected Factory loadPluginInstance(
+ PluginIdentifier pluginIdentifier, ClassLoader classLoader) {
+ ServiceLoader<Factory> serviceLoader =
+ ServiceLoader.load(getPluginBaseClass(), classLoader);
+ for (Factory factory : serviceLoader) {
+ if (factoryClass.isInstance(factory)) {
+ String factoryIdentifier = factory.factoryIdentifier();
+ String pluginName = pluginIdentifier.getPluginName();
+ if (StringUtils.equalsIgnoreCase(factoryIdentifier,
pluginName)) {
+ return factory;
+ }
+ }
+ }
+ return null;
+ }
+}