This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f083a86c0f [Improve] Refactor Spark/Flink execution processor (#5595)
f083a86c0f is described below

commit f083a86c0f6a2fe14ce52de1baec059d27ddbcf3
Author: Jia Fan <[email protected]>
AuthorDate: Mon Oct 16 10:44:37 2023 +0800

    [Improve] Refactor Spark/Flink execution processor (#5595)
---
 .../api/table/catalog/CatalogTableUtil.java        |  19 +++
 .../seatunnel/api/table/factory/FactoryUtil.java   |  19 +--
 .../core/starter/execution/PluginUtil.java         | 176 +++++++++++++++++++++
 .../core/starter/execution/SourceTableInfo.java    |  35 ++++
 .../flink/execution/SinkExecuteProcessor.java      | 133 ++++++++++------
 .../flink/execution/DataStreamTableInfo.java       |  37 +++++
 .../FlinkAbstractPluginExecuteProcessor.java       |  28 +++-
 .../starter/flink/execution/FlinkExecution.java    |  10 +-
 .../flink/execution/SinkExecuteProcessor.java      | 135 ++++++++++------
 .../flink/execution/SourceExecuteProcessor.java    |  59 ++++---
 .../flink/execution/TransformExecuteProcessor.java |  95 +++++------
 .../spark/execution/SinkExecuteProcessor.java      | 110 +++++++++----
 .../starter/spark/execution/DatasetTableInfo.java  |  37 +++++
 .../spark/execution/SinkExecuteProcessor.java      | 110 +++++++++----
 .../spark/execution/SourceExecuteProcessor.java    |  49 ++++--
 .../SparkAbstractPluginExecuteProcessor.java       |  32 +++-
 .../starter/spark/execution/SparkExecution.java    |  11 +-
 .../spark/execution/TransformExecuteProcessor.java |  76 +++++----
 .../engine/core/parse/ConfigParserUtil.java        |   2 +-
 .../engine/core/parse/JobConfigParser.java         |   6 +-
 .../core/parse/MultipleTableJobConfigParser.java   |  22 +--
 .../plugin/discovery/AbstractPluginDiscovery.java  |  27 +++-
 .../plugin/discovery/PluginDiscovery.java          |  19 +++
 .../seatunnel/SeaTunnelFactoryDiscovery.java       |  67 ++++++++
 24 files changed, 959 insertions(+), 355 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 557759ef3e..880d97ca46 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -190,6 +190,25 @@ public class CatalogTableUtil implements Serializable {
         }
     }
 
+    public static List<CatalogTable> convertDataTypeToCatalogTables(
+            SeaTunnelDataType<?> seaTunnelDataType, String tableId) {
+        List<CatalogTable> catalogTables;
+        if (seaTunnelDataType instanceof MultipleRowType) {
+            catalogTables = new ArrayList<>();
+            for (String id : ((MultipleRowType) 
seaTunnelDataType).getTableIds()) {
+                catalogTables.add(
+                        CatalogTableUtil.getCatalogTable(
+                                id, ((MultipleRowType) 
seaTunnelDataType).getRowType(id)));
+            }
+        } else {
+            catalogTables =
+                    Collections.singletonList(
+                            CatalogTableUtil.getCatalogTable(
+                                    tableId, (SeaTunnelRowType) 
seaTunnelDataType));
+        }
+        return catalogTables;
+    }
+
     public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
         if (readonlyConfig.get(TableSchemaOptions.SCHEMA) == null) {
             throw new RuntimeException(
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 48ed785c39..9892e1b8bc 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -32,9 +32,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSource;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 
 import org.slf4j.Logger;
@@ -62,7 +60,7 @@ public final class FactoryUtil {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtil.class);
 
-    static final String DEFAULT_ID = "default-identifier";
+    public static final String DEFAULT_ID = "default-identifier";
 
     public static <T, SplitT extends SourceSplit, StateT extends Serializable>
             List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, 
List<CatalogTable>>>
@@ -86,19 +84,8 @@ public final class FactoryUtil {
                 SeaTunnelDataType<T> seaTunnelDataType = 
source.getProducedType();
                 final String tableId =
                         
options.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
-                if (seaTunnelDataType instanceof MultipleRowType) {
-                    catalogTables = new ArrayList<>();
-                    for (String id : ((MultipleRowType) 
seaTunnelDataType).getTableIds()) {
-                        catalogTables.add(
-                                CatalogTableUtil.getCatalogTable(
-                                        id, ((MultipleRowType) 
seaTunnelDataType).getRowType(id)));
-                    }
-                } else {
-                    catalogTables =
-                            Collections.singletonList(
-                                    CatalogTableUtil.getCatalogTable(
-                                            tableId, (SeaTunnelRowType) 
seaTunnelDataType));
-                }
+                catalogTables =
+                        
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
             }
             LOG.info(
                     "get the CatalogTable from source {}: {}",
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
new file mode 100644
index 0000000000..0b1f34d3c1
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.execution;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryException;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
+
+import com.google.common.collect.Lists;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+
+/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
+public class PluginUtil {
+
+    protected static final String ENGINE_TYPE = "seatunnel";
+
+    public static SourceTableInfo createSource(
+            SeaTunnelFactoryDiscovery factoryDiscovery,
+            SeaTunnelSourcePluginDiscovery sourcePluginDiscovery,
+            PluginIdentifier pluginIdentifier,
+            Config pluginConfig,
+            JobContext jobContext) {
+        // get current thread classloader
+        ClassLoader classLoader =
+                Thread.currentThread()
+                        .getContextClassLoader(); // try to find factory of 
this plugin
+
+        final ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
+        // try to find table source factory
+        final Optional<Factory> sourceFactory =
+                
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
+        final boolean fallback = isFallback(sourceFactory);
+        SeaTunnelSource source;
+        if (fallback) {
+            source = fallbackCreate(sourcePluginDiscovery, pluginIdentifier, 
pluginConfig);
+        } else {
+            // create source with source factory
+            TableSourceFactoryContext context =
+                    new TableSourceFactoryContext(readonlyConfig, classLoader);
+            
ConfigValidator.of(context.getOptions()).validate(sourceFactory.get().optionRule());
+            TableSource tableSource =
+                    ((TableSourceFactory) 
sourceFactory.get()).createSource(context);
+            source = tableSource.createSource();
+        }
+        source.setJobContext(jobContext);
+        ensureJobModeMatch(jobContext, source);
+        List<CatalogTable> catalogTables;
+        try {
+            catalogTables = source.getProducedCatalogTables();
+        } catch (UnsupportedOperationException e) {
+            // TODO remove it when all connector use `getProducedCatalogTables`
+            SeaTunnelDataType<?> seaTunnelDataType = source.getProducedType();
+            final String tableId =
+                    
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+            catalogTables =
+                    
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
+        }
+
+        if (catalogTables.size() != 1) {
+            throw new SeaTunnelException(
+                    String.format("Unsupported table number: %d on flink", 
catalogTables.size()));
+        }
+        return new SourceTableInfo(source, catalogTables);
+    }
+
+    private static boolean isFallback(Optional<Factory> factory) {
+        if (!factory.isPresent()) {
+            return true;
+        }
+        try {
+            ((TableSourceFactory) factory.get()).createSource(null);
+        } catch (Exception e) {
+            if (e instanceof UnsupportedOperationException
+                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
+                            .equals(e.getMessage())) {
+                return true;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private static SeaTunnelSource fallbackCreate(
+            SeaTunnelSourcePluginDiscovery sourcePluginDiscovery,
+            PluginIdentifier pluginIdentifier,
+            Config pluginConfig) {
+        SeaTunnelSource source = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
+        source.prepare(pluginConfig);
+        return source;
+    }
+
+    public static TableTransformFactory createTransformFactory(
+            SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
+            Config transformConfig,
+            List<URL> pluginJars) {
+        PluginIdentifier pluginIdentifier =
+                PluginIdentifier.of(
+                        ENGINE_TYPE, "transform", 
transformConfig.getString(PLUGIN_NAME.key()));
+        final ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(transformConfig);
+        final String factoryId = readonlyConfig.get(PLUGIN_NAME);
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        final TableTransformFactory factory =
+                discoverFactory(classLoader, TableTransformFactory.class, 
factoryId);
+        pluginJars.addAll(
+                
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+        return factory;
+    }
+
+    public static Optional<? extends Factory> createSinkFactory(
+            SeaTunnelFactoryDiscovery factoryDiscovery,
+            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+            Config sinkConfig,
+            List<URL> pluginJars) {
+        PluginIdentifier pluginIdentifier =
+                PluginIdentifier.of(ENGINE_TYPE, "sink", 
sinkConfig.getString(PLUGIN_NAME.key()));
+        pluginJars.addAll(
+                
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+        try {
+            return 
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
+        } catch (FactoryException e) {
+            return Optional.empty();
+        }
+    }
+
+    public static void ensureJobModeMatch(JobContext jobContext, 
SeaTunnelSource source) {
+        if (jobContext.getJobMode() == JobMode.BATCH
+                && source.getBoundedness()
+                        == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "'%s' source don't support off-line job.", 
source.getPluginName()));
+        }
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
new file mode 100644
index 0000000000..529b9b4207
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.execution;
+
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class SourceTableInfo {
+
+    private SeaTunnelSource source;
+
+    private List<CatalogTable> catalogTables;
+}
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index cf0ad1e7be..8ef78a9762 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,32 +21,35 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.types.Row;
 
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
 public class SinkExecuteProcessor
-        extends FlinkAbstractPluginExecuteProcessor<
-                SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> {
+        extends FlinkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
 
     private static final String PLUGIN_TYPE = PluginType.SINK.getType();
 
@@ -56,60 +59,67 @@ public class SinkExecuteProcessor
     }
 
     @Override
-    protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>>
-            initializePlugins(List<URL> jarPaths, List<? extends Config> 
pluginConfigs) {
+    protected List<Optional<? extends Factory>> initializePlugins(
+            List<URL> jarPaths, List<? extends Config> pluginConfigs) {
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableSinkFactory.class, 
ADD_URL_TO_CLASSLOADER);
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
                 new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
-        List<URL> pluginJars = new ArrayList<>();
-        List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> sinks =
-                pluginConfigs.stream()
-                        .map(
-                                sinkConfig -> {
-                                    PluginIdentifier pluginIdentifier =
-                                            PluginIdentifier.of(
-                                                    ENGINE_TYPE,
-                                                    PLUGIN_TYPE,
-                                                    
sinkConfig.getString(PLUGIN_NAME));
-                                    pluginJars.addAll(
-                                            
sinkPluginDiscovery.getPluginJarPaths(
-                                                    
Lists.newArrayList(pluginIdentifier)));
-                                    SeaTunnelSink<
-                                                    SeaTunnelRow,
-                                                    Serializable,
-                                                    Serializable,
-                                                    Serializable>
-                                            seaTunnelSink =
-                                                    
sinkPluginDiscovery.createPluginInstance(
-                                                            pluginIdentifier);
-                                    seaTunnelSink.prepare(sinkConfig);
-                                    seaTunnelSink.setJobContext(jobContext);
-                                    return seaTunnelSink;
-                                })
-                        .distinct()
-                        .collect(Collectors.toList());
-        jarPaths.addAll(pluginJars);
-        return sinks;
+        return pluginConfigs.stream()
+                .map(
+                        sinkConfig ->
+                                PluginUtil.createSinkFactory(
+                                        factoryDiscovery,
+                                        sinkPluginDiscovery,
+                                        sinkConfig,
+                                        jarPaths))
+                .distinct()
+                .collect(Collectors.toList());
     }
 
     @Override
-    public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams)
+    public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> 
upstreamDataStreams)
             throws TaskExecuteException {
-        DataStream<Row> input = upstreamDataStreams.get(0);
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
+                new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
+        DataStreamTableInfo input = upstreamDataStreams.get(0);
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
-            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink =
-                    plugins.get(i);
-            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
-            SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
-            seaTunnelSink.setTypeInfo(sourceType);
-            if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
-                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
+            DataStreamTableInfo stream =
+                    fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
+            Optional<? extends Factory> factory = plugins.get(i);
+            boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
+            SeaTunnelSink sink;
+            if (fallBack) {
+                sink =
+                        fallbackCreateSink(
+                                sinkPluginDiscovery,
+                                PluginIdentifier.of(
+                                        ENGINE_TYPE,
+                                        PLUGIN_TYPE,
+                                        
sinkConfig.getString(PLUGIN_NAME.key())),
+                                sinkConfig);
+                sink.setJobContext(jobContext);
+                SeaTunnelRowType sourceType = initSourceType(sinkConfig, 
stream.getDataStream());
+                sink.setTypeInfo(sourceType);
+            } else {
+                TableSinkFactoryContext context =
+                        new TableSinkFactoryContext(
+                                stream.getCatalogTable(),
+                                ReadonlyConfig.fromConfig(sinkConfig),
+                                classLoader);
+                
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+                sink = ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
+                sink.setJobContext(jobContext);
+            }
+            if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
                 DataSaveMode dataSaveMode = 
saveModeSink.getUserConfigSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
             DataStreamSink<Row> dataStreamSink =
-                    stream.sinkTo(new FlinkSink<>(seaTunnelSink))
-                            .name(seaTunnelSink.getPluginName());
+                    stream.getDataStream().sinkTo(new 
FlinkSink<>(sink)).name(sink.getPluginName());
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
@@ -118,4 +128,27 @@ public class SinkExecuteProcessor
         // the sink is the last stream
         return null;
     }
+
+    public boolean isFallback(Factory factory) {
+        try {
+            ((TableSinkFactory) factory).createSink(null);
+        } catch (Exception e) {
+            if (e instanceof UnsupportedOperationException
+                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
+                            .equals(e.getMessage())) {
+                return true;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public SeaTunnelSink fallbackCreateSink(
+            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+            PluginIdentifier pluginIdentifier,
+            Config pluginConfig) {
+        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+        source.prepare(pluginConfig);
+        return source;
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
new file mode 100644
index 0000000000..7b158ee60b
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.execution;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class DataStreamTableInfo {
+
+    private DataStream<Row> dataStream;
+
+    private CatalogTable catalogTable;
+
+    private String tableName;
+}
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index ed8b72a8f0..44b2f914fc 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
 import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
@@ -41,9 +42,9 @@ import java.util.function.BiConsumer;
 import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
 
 public abstract class FlinkAbstractPluginExecuteProcessor<T>
-        implements PluginExecuteProcessor<DataStream<Row>, 
FlinkRuntimeEnvironment> {
+        implements PluginExecuteProcessor<DataStreamTableInfo, 
FlinkRuntimeEnvironment> {
     protected static final String ENGINE_TYPE = "seatunnel";
-    protected static final String PLUGIN_NAME = "plugin_name";
+    protected static final String PLUGIN_NAME_KEY = "plugin_name";
     protected static final String SOURCE_TABLE_NAME = "source_table_name";
     protected static HashMap<String, Boolean> isAppendMap = new HashMap<>();
 
@@ -78,15 +79,30 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
         this.flinkRuntimeEnvironment = flinkRuntimeEnvironment;
     }
 
-    protected Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
+    protected Optional<DataStreamTableInfo> fromSourceTable(
+            Config pluginConfig, List<DataStreamTableInfo> 
upstreamDataStreams) {
         if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
             StreamTableEnvironment tableEnvironment =
                     flinkRuntimeEnvironment.getStreamTableEnvironment();
             String tableName = pluginConfig.getString(SOURCE_TABLE_NAME);
             Table table = tableEnvironment.from(tableName);
-            return Optional.ofNullable(
-                    TableUtil.tableToDataStream(
-                            tableEnvironment, table, 
isAppendMap.getOrDefault(tableName, true)));
+            DataStreamTableInfo dataStreamTableInfo =
+                    upstreamDataStreams.stream()
+                            .filter(info -> 
tableName.equals(info.getTableName()))
+                            .findFirst()
+                            .orElseThrow(
+                                    () ->
+                                            new SeaTunnelException(
+                                                    String.format(
+                                                            "table %s not 
found", tableName)));
+            return Optional.of(
+                    new DataStreamTableInfo(
+                            TableUtil.tableToDataStream(
+                                    tableEnvironment,
+                                    table,
+                                    isAppendMap.getOrDefault(tableName, true)),
+                            dataStreamTableInfo.getCatalogTable(),
+                            tableName));
         }
         return Optional.empty();
     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 7c56cd68b1..3e079db3a3 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -35,8 +35,6 @@ import 
org.apache.seatunnel.core.starter.execution.TaskExecution;
 import org.apache.seatunnel.core.starter.flink.FlinkStarter;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -56,11 +54,11 @@ import java.util.stream.Stream;
 @Slf4j
 public class FlinkExecution implements TaskExecution {
     private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
-    private final PluginExecuteProcessor<DataStream<Row>, 
FlinkRuntimeEnvironment>
+    private final PluginExecuteProcessor<DataStreamTableInfo, 
FlinkRuntimeEnvironment>
             sourcePluginExecuteProcessor;
-    private final PluginExecuteProcessor<DataStream<Row>, 
FlinkRuntimeEnvironment>
+    private final PluginExecuteProcessor<DataStreamTableInfo, 
FlinkRuntimeEnvironment>
             transformPluginExecuteProcessor;
-    private final PluginExecuteProcessor<DataStream<Row>, 
FlinkRuntimeEnvironment>
+    private final PluginExecuteProcessor<DataStreamTableInfo, 
FlinkRuntimeEnvironment>
             sinkPluginExecuteProcessor;
     private final List<URL> jarPaths;
 
@@ -104,7 +102,7 @@ public class FlinkExecution implements TaskExecution {
 
     @Override
     public void execute() throws TaskExecuteException {
-        List<DataStream<Row>> dataStreams = new ArrayList<>();
+        List<DataStreamTableInfo> dataStreams = new ArrayList<>();
         dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
         dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
         sinkPluginExecuteProcessor.execute(dataStreams);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 340351d1d4..2e6b742d67 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,33 +21,36 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
 import org.apache.flink.types.Row;
 
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
 public class SinkExecuteProcessor
-        extends FlinkAbstractPluginExecuteProcessor<
-                SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> {
+        extends FlinkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
 
     private static final String PLUGIN_TYPE = PluginType.SINK.getType();
 
@@ -57,60 +60,69 @@ public class SinkExecuteProcessor
     }
 
     @Override
-    protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>>
-            initializePlugins(List<URL> jarPaths, List<? extends Config> 
pluginConfigs) {
+    protected List<Optional<? extends Factory>> initializePlugins(
+            List<URL> jarPaths, List<? extends Config> pluginConfigs) {
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableSinkFactory.class, 
ADD_URL_TO_CLASSLOADER);
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
                 new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
-        List<URL> pluginJars = new ArrayList<>();
-        List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> sinks =
-                pluginConfigs.stream()
-                        .map(
-                                sinkConfig -> {
-                                    PluginIdentifier pluginIdentifier =
-                                            PluginIdentifier.of(
-                                                    ENGINE_TYPE,
-                                                    PLUGIN_TYPE,
-                                                    
sinkConfig.getString(PLUGIN_NAME));
-                                    pluginJars.addAll(
-                                            
sinkPluginDiscovery.getPluginJarPaths(
-                                                    
Lists.newArrayList(pluginIdentifier)));
-                                    SeaTunnelSink<
-                                                    SeaTunnelRow,
-                                                    Serializable,
-                                                    Serializable,
-                                                    Serializable>
-                                            seaTunnelSink =
-                                                    
sinkPluginDiscovery.createPluginInstance(
-                                                            pluginIdentifier);
-                                    seaTunnelSink.prepare(sinkConfig);
-                                    seaTunnelSink.setJobContext(jobContext);
-                                    return seaTunnelSink;
-                                })
-                        .distinct()
-                        .collect(Collectors.toList());
-        jarPaths.addAll(pluginJars);
-        return sinks;
+        return pluginConfigs.stream()
+                .map(
+                        sinkConfig ->
+                                PluginUtil.createSinkFactory(
+                                        factoryDiscovery,
+                                        sinkPluginDiscovery,
+                                        sinkConfig,
+                                        jarPaths))
+                .distinct()
+                .collect(Collectors.toList());
     }
 
     @Override
-    public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams)
+    public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> 
upstreamDataStreams)
             throws TaskExecuteException {
-        DataStream<Row> input = upstreamDataStreams.get(0);
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
+                new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
+        DataStreamTableInfo input = upstreamDataStreams.get(0);
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
-            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink =
-                    plugins.get(i);
-            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
-            SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
-            seaTunnelSink.setTypeInfo(sourceType);
-            if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
-                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
+            DataStreamTableInfo stream =
+                    fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
+            Optional<? extends Factory> factory = plugins.get(i);
+            boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
+            SeaTunnelSink sink;
+            if (fallBack) {
+                sink =
+                        fallbackCreateSink(
+                                sinkPluginDiscovery,
+                                PluginIdentifier.of(
+                                        ENGINE_TYPE,
+                                        PLUGIN_TYPE,
+                                        
sinkConfig.getString(PLUGIN_NAME.key())),
+                                sinkConfig);
+                sink.setJobContext(jobContext);
+                SeaTunnelRowType sourceType = initSourceType(sinkConfig, 
stream.getDataStream());
+                sink.setTypeInfo(sourceType);
+            } else {
+                TableSinkFactoryContext context =
+                        new TableSinkFactoryContext(
+                                stream.getCatalogTable(),
+                                ReadonlyConfig.fromConfig(sinkConfig),
+                                classLoader);
+                
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+                sink = ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
+                sink.setJobContext(jobContext);
+            }
+            if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
                 DataSaveMode dataSaveMode = 
saveModeSink.getUserConfigSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
             DataStreamSink<Row> dataStreamSink =
-                    stream.sinkTo(SinkV1Adapter.wrap(new 
FlinkSink<>(seaTunnelSink)))
-                            .name(seaTunnelSink.getPluginName());
+                    stream.getDataStream()
+                            .sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(sink)))
+                            .name(sink.getPluginName());
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
@@ -119,4 +131,27 @@ public class SinkExecuteProcessor
         // the sink is the last stream
         return null;
     }
+
+    public boolean isFallback(Factory factory) {
+        try {
+            ((TableSinkFactory) factory).createSink(null);
+        } catch (Exception e) {
+            if (e instanceof UnsupportedOperationException
+                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
+                            .equals(e.getMessage())) {
+                return true;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public SeaTunnelSink fallbackCreateSink(
+            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+            PluginIdentifier pluginIdentifier,
+            Config pluginConfig) {
+        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+        source.prepare(pluginConfig);
+        return source;
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 2f5e3a63ba..5636d231c5 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -23,11 +23,14 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.enums.PluginType;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
+import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import 
org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction;
 import 
org.apache.seatunnel.translation.flink.source.SeaTunnelCoordinatedSource;
@@ -35,7 +38,6 @@ import 
org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -43,6 +45,7 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.types.Row;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 
 import java.net.URL;
 import java.util.ArrayList;
@@ -51,8 +54,12 @@ import java.util.List;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
 
-public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<SeaTunnelSource> {
+@Slf4j
+@SuppressWarnings("unchecked,rawtypes")
+public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<SourceTableInfo> {
     private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
     private Config envConfigs;
 
@@ -62,12 +69,13 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
     }
 
     @Override
-    public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams) {
+    public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> 
upstreamDataStreams) {
         StreamExecutionEnvironment executionEnvironment =
                 flinkRuntimeEnvironment.getStreamExecutionEnvironment();
-        List<DataStream<Row>> sources = new ArrayList<>();
+        List<DataStreamTableInfo> sources = new ArrayList<>();
         for (int i = 0; i < plugins.size(); i++) {
-            SeaTunnelSource internalSource = plugins.get(i);
+            SourceTableInfo sourceTableInfo = plugins.get(i);
+            SeaTunnelSource internalSource = sourceTableInfo.getSource();
             Config pluginConfig = pluginConfigs.get(i);
             BaseSeaTunnelSourceFunction sourceFunction;
             if (internalSource instanceof SupportCoordinate) {
@@ -94,7 +102,13 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
                 sourceStream.setParallelism(parallelism);
             }
             registerResultTable(pluginConfig, sourceStream);
-            sources.add(sourceStream);
+            sources.add(
+                    new DataStreamTableInfo(
+                            sourceStream,
+                            sourceTableInfo.getCatalogTables().get(0),
+                            pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+                                    ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
+                                    : null));
         }
         return sources;
     }
@@ -125,31 +139,30 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
     }
 
     @Override
-    protected List<SeaTunnelSource> initializePlugins(
+    protected List<SourceTableInfo> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
                 new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
-        List<SeaTunnelSource> sources = new ArrayList<>();
+
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableSourceFactory.class, 
ADD_URL_TO_CLASSLOADER);
+
+        List<SourceTableInfo> sources = new ArrayList<>();
         Set<URL> jars = new HashSet<>();
         for (Config sourceConfig : pluginConfigs) {
             PluginIdentifier pluginIdentifier =
                     PluginIdentifier.of(
-                            ENGINE_TYPE, PLUGIN_TYPE, 
sourceConfig.getString(PLUGIN_NAME));
+                            ENGINE_TYPE, PLUGIN_TYPE, 
sourceConfig.getString(PLUGIN_NAME.key()));
             jars.addAll(
                     
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSource seaTunnelSource =
-                    
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
-            seaTunnelSource.prepare(sourceConfig);
-            seaTunnelSource.setJobContext(jobContext);
-            if (jobContext.getJobMode() == JobMode.BATCH
-                    && seaTunnelSource.getBoundedness()
-                            == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "'%s' source don't support off-line job.",
-                                seaTunnelSource.getPluginName()));
-            }
-            sources.add(seaTunnelSource);
+            SourceTableInfo source =
+                    PluginUtil.createSource(
+                            factoryDiscovery,
+                            sourcePluginDiscovery,
+                            pluginIdentifier,
+                            sourceConfig,
+                            jobContext);
+            sources.add(source);
         }
         jarPaths.addAll(jars);
         return sources;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 0dc36c62b0..810f6ba32b 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -20,11 +20,15 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
 import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
@@ -35,17 +39,15 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 
-import com.google.common.collect.Lists;
-
 import java.net.URL;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class TransformExecuteProcessor
-        extends FlinkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
 
-    private static final String PLUGIN_TYPE = "transform";
+public class TransformExecuteProcessor
+        extends FlinkAbstractPluginExecuteProcessor<TableTransformFactory> {
 
     protected TransformExecuteProcessor(
             List<URL> jarPaths, List<? extends Config> pluginConfigs, 
JobContext jobContext) {
@@ -53,65 +55,64 @@ public class TransformExecuteProcessor
     }
 
     @Override
-    protected List<SeaTunnelTransform> initializePlugins(
+    protected List<TableTransformFactory> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs) {
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
                 new SeaTunnelTransformPluginDiscovery();
-        List<URL> pluginJars = new ArrayList<>();
-        List<SeaTunnelTransform> transforms =
-                pluginConfigs.stream()
-                        .map(
-                                transformConfig -> {
-                                    PluginIdentifier pluginIdentifier =
-                                            PluginIdentifier.of(
-                                                    ENGINE_TYPE,
-                                                    PLUGIN_TYPE,
-                                                    
transformConfig.getString(PLUGIN_NAME));
-                                    List<URL> pluginJarPaths =
-                                            
transformPluginDiscovery.getPluginJarPaths(
-                                                    
Lists.newArrayList(pluginIdentifier));
-                                    SeaTunnelTransform<?> seaTunnelTransform =
-                                            
transformPluginDiscovery.createPluginInstance(
-                                                    pluginIdentifier);
-                                    jarPaths.addAll(pluginJarPaths);
-                                    
seaTunnelTransform.prepare(transformConfig);
-                                    
seaTunnelTransform.setJobContext(jobContext);
-                                    return seaTunnelTransform;
-                                })
-                        .distinct()
-                        .collect(Collectors.toList());
-        jarPaths.addAll(pluginJars);
-        return transforms;
+
+        return pluginConfigs.stream()
+                .map(
+                        transformConfig ->
+                                PluginUtil.createTransformFactory(
+                                        transformPluginDiscovery, 
transformConfig, jarPaths))
+                .distinct()
+                .collect(Collectors.toList());
     }
 
     @Override
-    public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams)
+    public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> 
upstreamDataStreams)
             throws TaskExecuteException {
         if (plugins.isEmpty()) {
             return upstreamDataStreams;
         }
-        DataStream<Row> input = upstreamDataStreams.get(0);
-        List<DataStream<Row>> result = new ArrayList<>();
+        DataStreamTableInfo input = upstreamDataStreams.get(0);
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         for (int i = 0; i < plugins.size(); i++) {
             try {
-                SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
                 Config pluginConfig = pluginConfigs.get(i);
-                DataStream<Row> stream = 
fromSourceTable(pluginConfig).orElse(input);
-                SeaTunnelRowType sourceType = initSourceType(pluginConfig, 
stream);
-                transform.setTypeInfo(sourceType);
-                input = flinkTransform(sourceType, transform, stream);
-                stageType(pluginConfig, (SeaTunnelRowType) 
transform.getProducedType());
-                registerResultTable(pluginConfig, input);
-                result.add(input);
+                DataStreamTableInfo stream =
+                        fromSourceTable(pluginConfig, 
upstreamDataStreams).orElse(input);
+                TableTransformFactory factory = plugins.get(i);
+                TableTransformFactoryContext context =
+                        new TableTransformFactoryContext(
+                                
Collections.singletonList(stream.getCatalogTable()),
+                                ReadonlyConfig.fromConfig(pluginConfig),
+                                classLoader);
+                
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
+                SeaTunnelTransform transform = 
factory.createTransform(context).createTransform();
+
+                SeaTunnelRowType sourceType = initSourceType(pluginConfig, 
stream.getDataStream());
+                transform.setJobContext(jobContext);
+                DataStream<Row> inputStream =
+                        flinkTransform(sourceType, transform, 
stream.getDataStream());
+                stageType(pluginConfig, 
transform.getProducedCatalogTable().getSeaTunnelRowType());
+                registerResultTable(pluginConfig, inputStream);
+                upstreamDataStreams.add(
+                        new DataStreamTableInfo(
+                                inputStream,
+                                transform.getProducedCatalogTable(),
+                                pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+                                        ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
+                                        : null));
             } catch (Exception e) {
                 throw new TaskExecuteException(
                         String.format(
                                 "SeaTunnel transform task: %s execute error",
-                                plugins.get(i).getPluginName()),
+                                plugins.get(i).factoryIdentifier()),
                         e);
             }
         }
-        return result;
+        return upstreamDataStreams;
     }
 
     protected DataStream<Row> flinkTransform(
@@ -119,7 +120,7 @@ public class TransformExecuteProcessor
         TypeInformation rowTypeInfo = 
TypeConverterUtils.convert(transform.getProducedType());
         FlinkRowConverter transformInputRowConverter = new 
FlinkRowConverter(sourceType);
         FlinkRowConverter transformOutputRowConverter =
-                new FlinkRowConverter(transform.getProducedType());
+                new 
FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType());
         DataStream<Row> output =
                 stream.flatMap(
                         new FlatMapFunction<Row, Row>() {
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 503f76b87a..0e3b18fb7d 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -21,13 +21,20 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
 import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -35,15 +42,16 @@ import 
org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
-import com.google.common.collect.Lists;
-
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
 public class SinkExecuteProcessor
-        extends SparkAbstractPluginExecuteProcessor<SeaTunnelSink<?, ?, ?, ?>> 
{
+        extends SparkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
     private static final String PLUGIN_TYPE = PluginType.SINK.getType();
 
     protected SinkExecuteProcessor(
@@ -54,29 +62,21 @@ public class SinkExecuteProcessor
     }
 
     @Override
-    protected List<SeaTunnelSink<?, ?, ?, ?>> initializePlugins(
+    protected List<Optional<? extends Factory>> initializePlugins(
             List<? extends Config> pluginConfigs) {
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
-        List<SeaTunnelSink<?, ?, ?, ?>> sinks =
+        List<Optional<? extends Factory>> sinks =
                 pluginConfigs.stream()
                         .map(
-                                sinkConfig -> {
-                                    PluginIdentifier pluginIdentifier =
-                                            PluginIdentifier.of(
-                                                    ENGINE_TYPE,
-                                                    PLUGIN_TYPE,
-                                                    
sinkConfig.getString(PLUGIN_NAME));
-                                    pluginJars.addAll(
-                                            
sinkPluginDiscovery.getPluginJarPaths(
-                                                    
Lists.newArrayList(pluginIdentifier)));
-                                    SeaTunnelSink<?, ?, ?, ?> seaTunnelSink =
-                                            
sinkPluginDiscovery.createPluginInstance(
-                                                    pluginIdentifier);
-                                    seaTunnelSink.prepare(sinkConfig);
-                                    seaTunnelSink.setJobContext(jobContext);
-                                    return seaTunnelSink;
-                                })
+                                sinkConfig ->
+                                        PluginUtil.createSinkFactory(
+                                                factoryDiscovery,
+                                                sinkPluginDiscovery,
+                                                sinkConfig,
+                                                pluginJars))
                         .distinct()
                         .collect(Collectors.toList());
         sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -84,14 +84,17 @@ public class SinkExecuteProcessor
     }
 
     @Override
-    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
+    public List<DatasetTableInfo> execute(List<DatasetTableInfo> 
upstreamDataStreams)
             throws TaskExecuteException {
-        Dataset<Row> input = upstreamDataStreams.get(0);
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        DatasetTableInfo input = upstreamDataStreams.get(0);
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
-            SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
-            Dataset<Row> dataset =
-                    fromSourceTable(sinkConfig, 
sparkRuntimeEnvironment).orElse(input);
+            DatasetTableInfo datasetTableInfo =
+                    fromSourceTable(sinkConfig, sparkRuntimeEnvironment, 
upstreamDataStreams)
+                            .orElse(input);
+            Dataset<Row> dataset = datasetTableInfo.getDataset();
             int parallelism;
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
@@ -104,19 +107,64 @@ public class SinkExecuteProcessor
                                         
CommonOptions.PARALLELISM.defaultValue());
             }
             
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), 
parallelism);
+            Optional<? extends Factory> factory = plugins.get(i);
+            boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
+            SeaTunnelSink sink;
+            if (fallBack) {
+                sink =
+                        fallbackCreateSink(
+                                sinkPluginDiscovery,
+                                PluginIdentifier.of(
+                                        ENGINE_TYPE,
+                                        PLUGIN_TYPE,
+                                        
sinkConfig.getString(PLUGIN_NAME.key())),
+                                sinkConfig);
+                sink.setJobContext(jobContext);
+                sink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
+            } else {
+                TableSinkFactoryContext context =
+                        new TableSinkFactoryContext(
+                                datasetTableInfo.getCatalogTable(),
+                                ReadonlyConfig.fromConfig(sinkConfig),
+                                classLoader);
+                
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+                sink = ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
+                sink.setJobContext(jobContext);
+            }
             // TODO modify checkpoint location
-            seaTunnelSink.setTypeInfo(
-                    (SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
-            if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
-                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
+            if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
                 DataSaveMode dataSaveMode = 
saveModeSink.getUserConfigSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
-            SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
+            SparkSinkInjector.inject(dataset.write(), sink)
                     .option("checkpointLocation", "/tmp")
                     .save();
         }
         // the sink is the last stream
         return null;
     }
+
+    public boolean isFallback(Factory factory) {
+        try {
+            ((TableSinkFactory) factory).createSink(null);
+        } catch (Exception e) {
+            if (e instanceof UnsupportedOperationException
+                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
+                            .equals(e.getMessage())) {
+                return true;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public SeaTunnelSink fallbackCreateSink(
+            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+            PluginIdentifier pluginIdentifier,
+            Config pluginConfig) {
+        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+        source.prepare(pluginConfig);
+        return source;
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/DatasetTableInfo.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/DatasetTableInfo.java
new file mode 100644
index 0000000000..3af8a5ad6d
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/DatasetTableInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.spark.execution;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class DatasetTableInfo {
+
+    private Dataset<Row> dataset;
+
+    private CatalogTable catalogTable;
+
+    private String tableName;
+}
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 8afffe1add..c85a10389d 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -21,13 +21,20 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
 import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -36,15 +43,16 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 
-import com.google.common.collect.Lists;
-
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
 public class SinkExecuteProcessor
-        extends SparkAbstractPluginExecuteProcessor<SeaTunnelSink<?, ?, ?, ?>> 
{
+        extends SparkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
     private static final String PLUGIN_TYPE = PluginType.SINK.getType();
 
     protected SinkExecuteProcessor(
@@ -55,29 +63,21 @@ public class SinkExecuteProcessor
     }
 
     @Override
-    protected List<SeaTunnelSink<?, ?, ?, ?>> initializePlugins(
+    protected List<Optional<? extends Factory>> initializePlugins(
             List<? extends Config> pluginConfigs) {
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
-        List<SeaTunnelSink<?, ?, ?, ?>> sinks =
+        List<Optional<? extends Factory>> sinks =
                 pluginConfigs.stream()
                         .map(
-                                sinkConfig -> {
-                                    PluginIdentifier pluginIdentifier =
-                                            PluginIdentifier.of(
-                                                    ENGINE_TYPE,
-                                                    PLUGIN_TYPE,
-                                                    
sinkConfig.getString(PLUGIN_NAME));
-                                    pluginJars.addAll(
-                                            
sinkPluginDiscovery.getPluginJarPaths(
-                                                    
Lists.newArrayList(pluginIdentifier)));
-                                    SeaTunnelSink<?, ?, ?, ?> seaTunnelSink =
-                                            
sinkPluginDiscovery.createPluginInstance(
-                                                    pluginIdentifier);
-                                    seaTunnelSink.prepare(sinkConfig);
-                                    seaTunnelSink.setJobContext(jobContext);
-                                    return seaTunnelSink;
-                                })
+                                sinkConfig ->
+                                        PluginUtil.createSinkFactory(
+                                                factoryDiscovery,
+                                                sinkPluginDiscovery,
+                                                sinkConfig,
+                                                new ArrayList<>()))
                         .distinct()
                         .collect(Collectors.toList());
         sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -85,14 +85,17 @@ public class SinkExecuteProcessor
     }
 
     @Override
-    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
+    public List<DatasetTableInfo> execute(List<DatasetTableInfo> 
upstreamDataStreams)
             throws TaskExecuteException {
-        Dataset<Row> input = upstreamDataStreams.get(0);
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        DatasetTableInfo input = upstreamDataStreams.get(0);
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
-            SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
-            Dataset<Row> dataset =
-                    fromSourceTable(sinkConfig, 
sparkRuntimeEnvironment).orElse(input);
+            DatasetTableInfo datasetTableInfo =
+                    fromSourceTable(sinkConfig, sparkRuntimeEnvironment, 
upstreamDataStreams)
+                            .orElse(input);
+            Dataset<Row> dataset = datasetTableInfo.getDataset();
             int parallelism;
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
@@ -105,15 +108,37 @@ public class SinkExecuteProcessor
                                         
CommonOptions.PARALLELISM.defaultValue());
             }
             
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), 
parallelism);
+            Optional<? extends Factory> factory = plugins.get(i);
+            boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
+            SeaTunnelSink sink;
+            if (fallBack) {
+                sink =
+                        fallbackCreateSink(
+                                sinkPluginDiscovery,
+                                PluginIdentifier.of(
+                                        ENGINE_TYPE,
+                                        PLUGIN_TYPE,
+                                        
sinkConfig.getString(PLUGIN_NAME.key())),
+                                sinkConfig);
+                sink.setJobContext(jobContext);
+                sink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
+            } else {
+                TableSinkFactoryContext context =
+                        new TableSinkFactoryContext(
+                                datasetTableInfo.getCatalogTable(),
+                                ReadonlyConfig.fromConfig(sinkConfig),
+                                classLoader);
+                
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
+                sink = ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
+                sink.setJobContext(jobContext);
+            }
             // TODO modify checkpoint location
-            seaTunnelSink.setTypeInfo(
-                    (SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
-            if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
-                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
+            if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
+                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
                 DataSaveMode dataSaveMode = 
saveModeSink.getUserConfigSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
-            SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
+            SparkSinkInjector.inject(dataset.write(), sink)
                     .option("checkpointLocation", "/tmp")
                     .mode(SaveMode.Append)
                     .save();
@@ -121,4 +146,27 @@ public class SinkExecuteProcessor
         // the sink is the last stream
         return null;
     }
+
+    public boolean isFallback(Factory factory) {
+        try {
+            ((TableSinkFactory) factory).createSink(null);
+        } catch (Exception e) {
+            if (e instanceof UnsupportedOperationException
+                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
+                            .equals(e.getMessage())) {
+                return true;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public SeaTunnelSink fallbackCreateSink(
+            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
+            PluginIdentifier pluginIdentifier,
+            Config pluginConfig) {
+        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+        source.prepare(pluginConfig);
+        return source;
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index d68aec3c23..126f14edb1 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -22,9 +22,13 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
+import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 
@@ -40,8 +44,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-public class SourceExecuteProcessor
-        extends SparkAbstractPluginExecuteProcessor<SeaTunnelSource<?, ?, ?>> {
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+
+@SuppressWarnings("rawtypes")
+public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<SourceTableInfo> {
     private static final String PLUGIN_TYPE = "source";
 
     public SourceExecuteProcessor(
@@ -52,10 +59,11 @@ public class SourceExecuteProcessor
     }
 
     @Override
-    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {
-        List<Dataset<Row>> sources = new ArrayList<>();
+    public List<DatasetTableInfo> execute(List<DatasetTableInfo> 
upstreamDataStreams) {
+        List<DatasetTableInfo> sources = new ArrayList<>();
         for (int i = 0; i < plugins.size(); i++) {
-            SeaTunnelSource<?, ?, ?> source = plugins.get(i);
+            SourceTableInfo sourceTableInfo = plugins.get(i);
+            SeaTunnelSource<?, ?, ?> source = sourceTableInfo.getSource();
             Config pluginConfig = pluginConfigs.get(i);
             int parallelism;
             if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
@@ -81,29 +89,40 @@ public class SourceExecuteProcessor
                                     (StructType)
                                             
TypeConverterUtils.convert(source.getProducedType()))
                             .load();
-            sources.add(dataset);
+            sources.add(
+                    new DatasetTableInfo(
+                            dataset,
+                            sourceTableInfo.getCatalogTables().get(0),
+                            pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+                                    ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
+                                    : null));
             registerInputTempView(pluginConfigs.get(i), dataset);
         }
         return sources;
     }
 
     @Override
-    protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(
-            List<? extends Config> pluginConfigs) {
+    protected List<SourceTableInfo> initializePlugins(List<? extends Config> 
pluginConfigs) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new 
SeaTunnelSourcePluginDiscovery();
-        List<SeaTunnelSource<?, ?, ?>> sources = new ArrayList<>();
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableSourceFactory.class);
+
+        List<SourceTableInfo> sources = new ArrayList<>();
         Set<URL> jars = new HashSet<>();
         for (Config sourceConfig : pluginConfigs) {
             PluginIdentifier pluginIdentifier =
                     PluginIdentifier.of(
-                            ENGINE_TYPE, PLUGIN_TYPE, 
sourceConfig.getString(PLUGIN_NAME));
+                            ENGINE_TYPE, PLUGIN_TYPE, 
sourceConfig.getString(PLUGIN_NAME.key()));
             jars.addAll(
                     
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSource<?, ?, ?> seaTunnelSource =
-                    
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
-            seaTunnelSource.prepare(sourceConfig);
-            seaTunnelSource.setJobContext(jobContext);
-            sources.add(seaTunnelSource);
+            SourceTableInfo source =
+                    PluginUtil.createSource(
+                            factoryDiscovery,
+                            sourcePluginDiscovery,
+                            pluginIdentifier,
+                            sourceConfig,
+                            jobContext);
+            sources.add(source);
         }
         sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));
         return sources;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
index ebfcaf6e91..a431f23dd1 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
 
 import org.apache.spark.sql.Dataset;
@@ -28,15 +29,15 @@ import org.apache.spark.sql.Row;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+
 public abstract class SparkAbstractPluginExecuteProcessor<T>
-        implements PluginExecuteProcessor<Dataset<Row>, 
SparkRuntimeEnvironment> {
+        implements PluginExecuteProcessor<DatasetTableInfo, 
SparkRuntimeEnvironment> {
     protected SparkRuntimeEnvironment sparkRuntimeEnvironment;
     protected final List<? extends Config> pluginConfigs;
     protected final JobContext jobContext;
     protected final List<T> plugins;
     protected static final String ENGINE_TYPE = "seatunnel";
-    protected static final String PLUGIN_NAME = "plugin_name";
-    protected static final String RESULT_TABLE_NAME = "result_table_name";
     protected static final String SOURCE_TABLE_NAME = "source_table_name";
 
     protected SparkAbstractPluginExecuteProcessor(
@@ -57,19 +58,34 @@ public abstract class SparkAbstractPluginExecuteProcessor<T>
     protected abstract List<T> initializePlugins(List<? extends Config> 
pluginConfigs);
 
     protected void registerInputTempView(Config pluginConfig, Dataset<Row> 
dataStream) {
-        if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
-            String tableName = pluginConfig.getString(RESULT_TABLE_NAME);
+        if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
+            String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
             registerTempView(tableName, dataStream);
         }
     }
 
-    protected Optional<Dataset<Row>> fromSourceTable(
-            Config pluginConfig, SparkRuntimeEnvironment 
sparkRuntimeEnvironment) {
+    protected Optional<DatasetTableInfo> fromSourceTable(
+            Config pluginConfig,
+            SparkRuntimeEnvironment sparkRuntimeEnvironment,
+            List<DatasetTableInfo> upstreamDataStreams) {
         if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
             return Optional.empty();
         }
         String sourceTableName = pluginConfig.getString(SOURCE_TABLE_NAME);
-        return 
Optional.of(sparkRuntimeEnvironment.getSparkSession().read().table(sourceTableName));
+        DatasetTableInfo datasetTableInfo =
+                upstreamDataStreams.stream()
+                        .filter(info -> 
sourceTableName.equals(info.getTableName()))
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new SeaTunnelException(
+                                                String.format(
+                                                        "table %s not found", 
sourceTableName)));
+        return Optional.of(
+                new DatasetTableInfo(
+                        
sparkRuntimeEnvironment.getSparkSession().read().table(sourceTableName),
+                        datasetTableInfo.getCatalogTable(),
+                        sourceTableName));
     }
 
     private void registerTempView(String tableName, Dataset<Row> ds) {
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index dab00cbdb4..48aa9abb2a 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -27,9 +27,6 @@ import 
org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
 import org.apache.seatunnel.core.starter.execution.TaskExecution;
 
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
@@ -39,11 +36,11 @@ import java.util.List;
 @Slf4j
 public class SparkExecution implements TaskExecution {
     private final SparkRuntimeEnvironment sparkRuntimeEnvironment;
-    private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
+    private final PluginExecuteProcessor<DatasetTableInfo, 
SparkRuntimeEnvironment>
             sourcePluginExecuteProcessor;
-    private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
+    private final PluginExecuteProcessor<DatasetTableInfo, 
SparkRuntimeEnvironment>
             transformPluginExecuteProcessor;
-    private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
+    private final PluginExecuteProcessor<DatasetTableInfo, 
SparkRuntimeEnvironment>
             sinkPluginExecuteProcessor;
 
     public SparkExecution(Config config) {
@@ -68,7 +65,7 @@ public class SparkExecution implements TaskExecution {
 
     @Override
     public void execute() throws TaskExecuteException {
-        List<Dataset<Row>> datasets = new ArrayList<>();
+        List<DatasetTableInfo> datasets = new ArrayList<>();
         datasets = sourcePluginExecuteProcessor.execute(datasets);
         datasets = transformPluginExecuteProcessor.execute(datasets);
         sinkPluginExecuteProcessor.execute(datasets);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index fc9be55925..da76e027db 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -20,11 +20,15 @@ package org.apache.seatunnel.core.starter.spark.execution;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 import 
org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
 import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -37,22 +41,22 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder;
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
 import org.apache.spark.sql.types.StructType;
 
-import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+
 @Slf4j
 public class TransformExecuteProcessor
-        extends SparkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
-
-    private static final String PLUGIN_TYPE = "transform";
+        extends SparkAbstractPluginExecuteProcessor<TableTransformFactory> {
 
     protected TransformExecuteProcessor(
             SparkRuntimeEnvironment sparkRuntimeEnvironment,
@@ -62,29 +66,18 @@ public class TransformExecuteProcessor
     }
 
     @Override
-    protected List<SeaTunnelTransform> initializePlugins(List<? extends 
Config> pluginConfigs) {
+    protected List<TableTransformFactory> initializePlugins(List<? extends 
Config> pluginConfigs) {
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
                 new SeaTunnelTransformPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
-        List<SeaTunnelTransform> transforms =
+        List<TableTransformFactory> transforms =
                 pluginConfigs.stream()
                         .map(
-                                transformConfig -> {
-                                    PluginIdentifier pluginIdentifier =
-                                            PluginIdentifier.of(
-                                                    ENGINE_TYPE,
-                                                    PLUGIN_TYPE,
-                                                    
transformConfig.getString(PLUGIN_NAME));
-                                    pluginJars.addAll(
-                                            
transformPluginDiscovery.getPluginJarPaths(
-                                                    
Lists.newArrayList(pluginIdentifier)));
-                                    SeaTunnelTransform pluginInstance =
-                                            
transformPluginDiscovery.createPluginInstance(
-                                                    pluginIdentifier);
-                                    pluginInstance.prepare(transformConfig);
-                                    pluginInstance.setJobContext(jobContext);
-                                    return pluginInstance;
-                                })
+                                transformConfig ->
+                                        PluginUtil.createTransformFactory(
+                                                transformPluginDiscovery,
+                                                transformConfig,
+                                                pluginJars))
                         .distinct()
                         .collect(Collectors.toList());
         sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -92,31 +85,46 @@ public class TransformExecuteProcessor
     }
 
     @Override
-    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
+    public List<DatasetTableInfo> execute(List<DatasetTableInfo> 
upstreamDataStreams)
             throws TaskExecuteException {
         if (plugins.isEmpty()) {
             return upstreamDataStreams;
         }
-        Dataset<Row> input = upstreamDataStreams.get(0);
-        List<Dataset<Row>> result = new ArrayList<>();
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        DatasetTableInfo input = upstreamDataStreams.get(0);
         for (int i = 0; i < plugins.size(); i++) {
             try {
-                SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
                 Config pluginConfig = pluginConfigs.get(i);
-                Dataset<Row> stream =
-                        fromSourceTable(pluginConfig, 
sparkRuntimeEnvironment).orElse(input);
-                input = sparkTransform(transform, stream);
-                registerInputTempView(pluginConfig, input);
-                result.add(input);
+                DatasetTableInfo dataset =
+                        fromSourceTable(pluginConfig, sparkRuntimeEnvironment, 
upstreamDataStreams)
+                                .orElse(input);
+                TableTransformFactory factory = plugins.get(i);
+                TableTransformFactoryContext context =
+                        new TableTransformFactoryContext(
+                                
Collections.singletonList(dataset.getCatalogTable()),
+                                ReadonlyConfig.fromConfig(pluginConfig),
+                                classLoader);
+                
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
+                SeaTunnelTransform transform = 
factory.createTransform(context).createTransform();
+
+                Dataset<Row> inputDataset = sparkTransform(transform, 
dataset.getDataset());
+                registerInputTempView(pluginConfig, inputDataset);
+                upstreamDataStreams.add(
+                        new DatasetTableInfo(
+                                inputDataset,
+                                transform.getProducedCatalogTable(),
+                                pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+                                        ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
+                                        : null));
             } catch (Exception e) {
                 throw new TaskExecuteException(
                         String.format(
                                 "SeaTunnel transform task: %s execute error",
-                                plugins.get(i).getPluginName()),
+                                plugins.get(i).factoryIdentifier()),
                         e);
             }
         }
-        return result;
+        return upstreamDataStreams;
     }
 
     private Dataset<Row> sparkTransform(SeaTunnelTransform transform, 
Dataset<Row> stream)
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
index a1b8ec87e7..fe1185c960 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
@@ -45,7 +45,7 @@ import static 
org.apache.seatunnel.api.common.CommonOptions.FACTORY_ID;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
 import static org.apache.seatunnel.api.common.CommonOptions.SOURCE_TABLE_NAME;
-import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.DEFAULT_ID;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
 
 @Slf4j
 public final class ConfigParserUtil {
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 46c28b6397..de1449b0b4 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -54,9 +55,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.DEFAULT_ID;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
 import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
-import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.ensureJobModeMatch;
 import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode;
 
 @Data
@@ -84,7 +84,7 @@ public class JobConfigParser {
         // old logic: prepare(initialization) -> set job context
         source.prepare(config);
         source.setJobContext(jobConfig.getJobContext());
-        ensureJobModeMatch(jobConfig.getJobContext(), source);
+        PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
         String actionName =
                 createSourceActionName(
                         0, config.getString(CollectionConstants.PLUGIN_NAME), 
getTableName(config));
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index d38b5aacb7..c501cb9a1a 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.engine.core.parse;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.DataSaveMode;
@@ -40,7 +39,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -58,8 +57,6 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
 import lombok.extern.slf4j.Slf4j;
 import scala.Tuple2;
 
@@ -82,6 +79,7 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
 import static 
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
 import static 
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryUrls;
 import static 
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputIds;
@@ -89,10 +87,6 @@ import static 
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputId
 @Slf4j
 public class MultipleTableJobConfigParser {
 
-    private static final ILogger LOGGER = 
Logger.getLogger(MultipleTableJobConfigParser.class);
-
-    static final String DEFAULT_ID = "default-identifier";
-
     private final IdGenerator idGenerator;
     private final JobConfig jobConfig;
 
@@ -320,7 +314,7 @@ public class MultipleTableJobConfigParser {
                     JobConfigParser.createSourceActionName(configIndex, 
factoryId, tableId);
             SeaTunnelSource<Object, SourceSplit, Serializable> source = 
tuple2._1();
             source.setJobContext(jobConfig.getJobContext());
-            ensureJobModeMatch(jobConfig.getJobContext(), source);
+            PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
             SourceAction<Object, SourceSplit, Serializable> action =
                     new SourceAction<>(id, actionName, tuple2._1(), 
factoryUrls);
             action.setParallelism(parallelism);
@@ -331,16 +325,6 @@ public class MultipleTableJobConfigParser {
         return new Tuple2<>(tableId, actions);
     }
 
-    public static void ensureJobModeMatch(JobContext jobContext, 
SeaTunnelSource source) {
-        if (jobContext.getJobMode() == JobMode.BATCH
-                && source.getBoundedness()
-                        == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
-            throw new JobDefineCheckException(
-                    String.format(
-                            "'%s' source don't support off-line job.", 
source.getPluginName()));
-        }
-    }
-
     public void parseTransforms(
             List<? extends Config> transformConfigs,
             ClassLoader classLoader,
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index a2006f7dc9..5042ecfbf0 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -166,22 +166,24 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
         return pluginIdentifiers;
     }
 
-    public Path getPluginDir() {
-        return pluginDir;
-    }
-
     @Override
     public T createPluginInstance(PluginIdentifier pluginIdentifier) {
         return (T) createPluginInstance(pluginIdentifier, 
Collections.EMPTY_LIST);
     }
 
     @Override
-    public T createPluginInstance(PluginIdentifier pluginIdentifier, 
Collection<URL> pluginJars) {
+    public Optional<T> createOptionalPluginInstance(PluginIdentifier 
pluginIdentifier) {
+        return createOptionalPluginInstance(pluginIdentifier, 
Collections.EMPTY_LIST);
+    }
+
+    @Override
+    public Optional<T> createOptionalPluginInstance(
+            PluginIdentifier pluginIdentifier, Collection<URL> pluginJars) {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         T pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
         if (pluginInstance != null) {
             log.info("Load plugin: {} from classpath", pluginIdentifier);
-            return pluginInstance;
+            return Optional.of(pluginInstance);
         }
         Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
         // if the plugin jar not exist in classpath, will load from plugin dir.
@@ -214,9 +216,18 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
                         pluginIdentifier,
                         pluginJarPath.get(),
                         classLoader.getClass().getName());
-                return pluginInstance;
+                return Optional.of(pluginInstance);
             }
         }
+        return Optional.empty();
+    }
+
+    @Override
+    public T createPluginInstance(PluginIdentifier pluginIdentifier, 
Collection<URL> pluginJars) {
+        Optional<T> instance = createOptionalPluginInstance(pluginIdentifier, 
pluginJars);
+        if (instance.isPresent()) {
+            return instance.get();
+        }
         throw new RuntimeException("Plugin " + pluginIdentifier + " not 
found.");
     }
 
@@ -286,7 +297,7 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
         return plugins;
     }
 
-    private T loadPluginInstance(PluginIdentifier pluginIdentifier, 
ClassLoader classLoader) {
+    protected T loadPluginInstance(PluginIdentifier pluginIdentifier, 
ClassLoader classLoader) {
         ServiceLoader<T> serviceLoader = 
ServiceLoader.load(getPluginBaseClass(), classLoader);
         for (T t : serviceLoader) {
             if (t instanceof PluginIdentifierInterface) {
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index 728ae15bab..3d45e719b2 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.plugin.discovery;
 import java.net.URL;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Plugins discovery interface, used to find plugin. Each plugin type should 
have its own
@@ -53,6 +54,24 @@ public interface PluginDiscovery<T> {
      */
     T createPluginInstance(PluginIdentifier pluginIdentifier, Collection<URL> 
pluginJars);
 
+    /**
+     * Get plugin instance by plugin identifier.
+     *
+     * @param pluginIdentifier plugin identifier.
+     * @return plugin instance. If not found, return Optional.empty().
+     */
+    Optional<T> createOptionalPluginInstance(PluginIdentifier 
pluginIdentifier);
+
+    /**
+     * Get plugin instance by plugin identifier.
+     *
+     * @param pluginIdentifier plugin identifier.
+     * @param pluginJars used to help plugin load
+     * @return plugin instance. If not found, return Optional.empty().
+     */
+    Optional<T> createOptionalPluginInstance(
+            PluginIdentifier pluginIdentifier, Collection<URL> pluginJars);
+
     /**
      * Get all plugin instances.
      *
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
new file mode 100644
index 0000000000..9fe8717488
--- /dev/null
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.seatunnel;
+
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URL;
+import java.util.ServiceLoader;
+import java.util.function.BiConsumer;
+
+public class SeaTunnelFactoryDiscovery extends 
AbstractPluginDiscovery<Factory> {
+
+    private final Class<? extends Factory> factoryClass;
+
+    public SeaTunnelFactoryDiscovery(Class<? extends Factory> factoryClass) {
+        super();
+        this.factoryClass = factoryClass;
+    }
+
+    public SeaTunnelFactoryDiscovery(
+            Class<? extends Factory> factoryClass,
+            BiConsumer<ClassLoader, URL> addURLToClassLoader) {
+        super(addURLToClassLoader);
+        this.factoryClass = factoryClass;
+    }
+
+    @Override
+    protected Class<Factory> getPluginBaseClass() {
+        return Factory.class;
+    }
+
+    @Override
+    protected Factory loadPluginInstance(
+            PluginIdentifier pluginIdentifier, ClassLoader classLoader) {
+        ServiceLoader<Factory> serviceLoader =
+                ServiceLoader.load(getPluginBaseClass(), classLoader);
+        for (Factory factory : serviceLoader) {
+            if (factoryClass.isInstance(factory)) {
+                String factoryIdentifier = factory.factoryIdentifier();
+                String pluginName = pluginIdentifier.getPluginName();
+                if (StringUtils.equalsIgnoreCase(factoryIdentifier, 
pluginName)) {
+                    return factory;
+                }
+            }
+        }
+        return null;
+    }
+}

Reply via email to