This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7eb0547c2 [feature][DAG] Support for mixing Factory and Plugin SPI 
(#4359)
7eb0547c2 is described below

commit 7eb0547c25188c0bf206bfa8294fc54f8188cacf
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Mar 21 09:56:01 2023 +0800

    [feature][DAG] Support for mixing Factory and Plugin SPI (#4359)
    
    * [feature][api] support RowType convert to CatalogTable
    
    * [feature][DAG] support check the correctness of the DAG
    
    * [feature][DAG] Support for mixing Factory and Plugin SPI
---
 .../apache/seatunnel/api/common/CommonOptions.java |   6 +-
 .../apache/seatunnel/api/env/EnvCommonOptions.java |   6 -
 .../SourceOptions.java}                            |  28 +-
 .../api/table/catalog/CatalogTableUtil.java        |  27 +-
 .../api/table/factory/TableSinkFactory.java        |   3 +-
 .../api/table/factory/TableSourceFactory.java      |   3 +-
 .../api/table/factory/TableTransformFactory.java   |   3 +-
 .../batch_fakesource_to_file_complex.conf          |   4 +-
 .../streaming_fakesource_to_file_complex.conf      |   4 +-
 .../engine/client/LogicalDagGeneratorTest.java     |   6 +-
 ....java => MultipleTableJobConfigParserTest.java} |  36 +-
 .../batch_fakesource_to_file_complex.conf          |   4 +-
 .../src/test/resources/client_test.conf            |   4 +-
 .../engine/core/parse/ConfigParserUtil.java        | 290 ++++++++++
 .../engine/core/parse/ConnectorInstanceLoader.java |  21 -
 .../engine/core/parse/JobConfigParser.java         | 591 +++++----------------
 .../core/parse/MultipleTableJobConfigParser.java   | 451 ++++++++++++----
 .../apache/seatunnel/engine/server/TestUtils.java  |   4 +-
 .../server/master/JobHistoryServiceTest.java       |   6 +-
 .../engine/server/master/JobMetricsTest.java       |  41 +-
 .../src/test/resources/fake_to_console.conf        |   4 +-
 21 files changed, 870 insertions(+), 672 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
index fff669a5a..5b6f404cc 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.api.common;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.util.List;
+
 public interface CommonOptions {
     Option<String> FACTORY_ID =
             Options.key("factory")
@@ -47,9 +49,9 @@ public interface CommonOptions {
                                     + "The data set (dataStream/dataset) 
registered here can be directly accessed by other plugins "
                                     + "by specifying source_table_name .");
 
-    Option<String> SOURCE_TABLE_NAME =
+    Option<List<String>> SOURCE_TABLE_NAME =
             Options.key("source_table_name")
-                    .stringType()
+                    .listType()
                     .noDefaultValue()
                     .withDescription(
                             "When source_table_name is not specified, "
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index eca9fda24..de3226b1c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -44,12 +44,6 @@ public interface EnvCommonOptions {
                     .noDefaultValue()
                     .withDescription("The job mode of this job, support Batch 
and Stream");
 
-    Option<ParsingMode> DAG_PARSING_MODE =
-            Options.key("dag-parsing.mode")
-                    .enumType(ParsingMode.class)
-                    .defaultValue(ParsingMode.SINGLENESS)
-                    .withDescription("Whether to enable parsing support for 
multi-table jobs");
-
     Option<Long> CHECKPOINT_INTERVAL =
             Options.key("checkpoint.interval")
                     .longType()
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceOptions.java
similarity index 54%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceOptions.java
index 235392b67..85a8b31a8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceOptions.java
@@ -15,24 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.source;
 
-import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.env.ParsingMode;
 
-/**
- * This is an SPI interface, used to create {@link
- * org.apache.seatunnel.api.table.connector.TableTransform}. Each plugin need 
to have it own
- * implementation.
- */
-public interface TableTransformFactory extends Factory {
-
-    /**
-     * We will never use this method now. So gave a default implement and 
return null.
-     *
-     * @param context TableFactoryContext
-     * @return
-     */
-    default <T> TableTransform<T> createTransform(TableFactoryContext context) 
{
-        throw new UnsupportedOperationException("unsupported now");
-    }
+public interface SourceOptions {
+    Option<ParsingMode> DAG_PARSING_MODE =
+            Options.key("dag-parsing.mode")
+                    .enumType(ParsingMode.class)
+                    .defaultValue(ParsingMode.SINGLENESS)
+                    .withDescription("Whether to enable parsing support for 
multi-table source");
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 00603e3f5..a6fa269ab 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -76,6 +76,23 @@ public class CatalogTableUtil implements Serializable {
         this.catalogTable = catalogTable;
     }
 
+    @Deprecated
+    public static CatalogTable getCatalogTable(String tableName, 
SeaTunnelRowType rowType) {
+        TableSchema.Builder schemaBuilder = TableSchema.builder();
+        for (int i = 0; i < rowType.getTotalFields(); i++) {
+            PhysicalColumn column =
+                    PhysicalColumn.of(
+                            rowType.getFieldName(i), rowType.getFieldType(i), 
0, true, null, null);
+            schemaBuilder.column(column);
+        }
+        return CatalogTable.of(
+                TableIdentifier.of("schema", "default", tableName),
+                schemaBuilder.build(),
+                new HashMap<>(),
+                new ArrayList<>(),
+                "It is converted from RowType and only has column 
information.");
+    }
+
     public static List<CatalogTable> getCatalogTables(Config config, 
ClassLoader classLoader) {
         ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
         Map<String, String> catalogOptions =
@@ -103,11 +120,13 @@ public class CatalogTableUtil implements Serializable {
                         catalogConfig,
                         classLoader,
                         factoryId);
-        if (!optionalCatalog.isPresent()) {
-            return Collections.emptyList();
-        }
+        return optionalCatalog
+                .map(catalog -> getCatalogTables(catalogConfig, catalog))
+                .orElse(Collections.emptyList());
+    }
 
-        Catalog catalog = optionalCatalog.get();
+    public static List<CatalogTable> getCatalogTables(
+            ReadonlyConfig catalogConfig, Catalog catalog) {
         // Get the list of specified tables
         List<String> tableNames = 
catalogConfig.get(CatalogOptions.TABLE_NAMES);
         List<CatalogTable> catalogTables = new ArrayList<>();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 4fb2981f8..f0015fa58 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -38,6 +38,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT
      */
     default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createSink(
             TableFactoryContext context) {
-        throw new UnsupportedOperationException("unsupported now");
+        throw new UnsupportedOperationException(
+                "The Factory has not been implemented and the deprecated 
Plugin will be used.");
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 1b600fd45..30f70efde 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -36,7 +36,8 @@ public interface TableSourceFactory extends Factory {
      */
     default <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> createSource(TableFactoryContext 
context) {
-        throw new UnsupportedOperationException("unsupported now");
+        throw new UnsupportedOperationException(
+                "The Factory has not been implemented and the deprecated 
Plugin will be used.");
     }
 
     /**
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
index 235392b67..33caf328d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
@@ -33,6 +33,7 @@ public interface TableTransformFactory extends Factory {
      * @return
      */
     default <T> TableTransform<T> createTransform(TableFactoryContext context) 
{
-        throw new UnsupportedOperationException("unsupported now");
+        throw new UnsupportedOperationException(
+                "The Factory has not been implemented and the deprecated 
Plugin will be used.");
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index 5bbd38ada..3d6ca5054 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -80,7 +80,7 @@ source {
       array.size = 10
       bytes.length = 10
       string.length = 10
-      result_table_name = "fake"
+      result_table_name = "fake2"
       schema = {
         fields {
           c_map = "map<string, array<int>>"
@@ -136,6 +136,6 @@ sink {
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error",
-    source_table_name="fake"
+    source_table_name=["fake","fake2"]
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index 96daf1365..15eb0c429 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -79,7 +79,7 @@ source {
       array.size = 10
       bytes.length = 10
       string.length = 10
-      result_table_name = "fake"
+      result_table_name = "fake2"
       parallelism = 1
       schema = {
         fields {
@@ -136,6 +136,6 @@ sink {
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error",
-    source_table_name="fake"
+    source_table_name=["fake","fake2"]
   }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index b7a2b0383..7b9ea063c 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -25,7 +25,7 @@ import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
-import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -49,14 +49,14 @@ public class LogicalDagGeneratorTest {
 
         IdGenerator idGenerator = new IdGenerator();
         ImmutablePair<List<Action>, Set<URL>> immutablePair =
-                new JobConfigParser(filePath, idGenerator, jobConfig).parse();
+                new MultipleTableJobConfigParser(filePath, idGenerator, 
jobConfig).parse();
 
         LogicalDagGenerator logicalDagGenerator =
                 new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, 
idGenerator);
         LogicalDag logicalDag = logicalDagGenerator.generate();
         JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
         String result =
-                
"{\"vertices\":[{\"id\":2,\"name\":\"Source[0]-FakeSource-fake(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Source[1]-FakeSource-fake(id=3)\",\"parallelism\":3},{\"id\":1,\"name\":\"Sink[0]-LocalFile-fake(id=1)\",\"parallelism\":6}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"},{\"inputVertex\":\"Source[1]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"}]}";
+                
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource-fake(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[0]-FakeSource-fake2(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile-default-identifier(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-default-identifier\"},{\"inputVertex\":\"Source[0]-FakeSource-fake2\",\"targetVertex\":\"Sink[0]-LocalFile-default-identifier\"}]}";
         Assertions.assertEquals(result, logicalDagJson.toString());
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
similarity index 74%
rename from 
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
rename to 
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index e9e59cf55..a806d0a96 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -31,10 +31,11 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.net.URL;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
-public class JobConfigParserTest {
+public class MultipleTableJobConfigParserTest {
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @Test
@@ -43,15 +44,15 @@ public class JobConfigParserTest {
         String filePath = 
TestUtils.getResource("/batch_fakesource_to_file.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setJobContext(new JobContext());
-        JobConfigParser jobConfigParser =
-                new JobConfigParser(filePath, new IdGenerator(), jobConfig);
+        MultipleTableJobConfigParser jobConfigParser =
+                new MultipleTableJobConfigParser(filePath, new IdGenerator(), 
jobConfig);
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
-        Assertions.assertEquals("Sink[0]-LocalFile-default", 
actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile-default-identifier", 
actions.get(0).getName());
         Assertions.assertEquals(1, actions.get(0).getUpstream().size());
         Assertions.assertEquals(
-                "Source[0]-FakeSource-default", 
actions.get(0).getUpstream().get(0).getName());
+                "Source[0]-FakeSource-fake", 
actions.get(0).getUpstream().get(0).getName());
 
         Assertions.assertEquals(3, 
actions.get(0).getUpstream().get(0).getParallelism());
         Assertions.assertEquals(3, actions.get(0).getParallelism());
@@ -64,21 +65,28 @@ public class JobConfigParserTest {
         String filePath = 
TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setJobContext(new JobContext());
-        JobConfigParser jobConfigParser =
-                new JobConfigParser(filePath, new IdGenerator(), jobConfig);
+        MultipleTableJobConfigParser jobConfigParser =
+                new MultipleTableJobConfigParser(filePath, new IdGenerator(), 
jobConfig);
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
 
-        Assertions.assertEquals("Sink[0]-LocalFile-fake", 
actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile-default-identifier", 
actions.get(0).getName());
         Assertions.assertEquals(2, actions.get(0).getUpstream().size());
-        Assertions.assertEquals(
-                "Source[0]-FakeSource-fake", 
actions.get(0).getUpstream().get(0).getName());
-        Assertions.assertEquals(
-                "Source[1]-FakeSource-fake", 
actions.get(0).getUpstream().get(1).getName());
+
+        String[] expected = {"Source[0]-FakeSource-fake", 
"Source[0]-FakeSource-fake2"};
+        String[] actual = {
+            actions.get(0).getUpstream().get(0).getName(),
+            actions.get(0).getUpstream().get(1).getName()
+        };
+
+        Arrays.sort(expected);
+        Arrays.sort(actual);
+
+        Assertions.assertArrayEquals(expected, actual);
 
         Assertions.assertEquals(3, 
actions.get(0).getUpstream().get(0).getParallelism());
         Assertions.assertEquals(3, 
actions.get(0).getUpstream().get(1).getParallelism());
-        Assertions.assertEquals(6, actions.get(0).getParallelism());
+        Assertions.assertEquals(3, actions.get(0).getParallelism());
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
index fa79239e0..f4acfb7a5 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -40,7 +40,7 @@ source {
   }
 
   FakeSource {
-    result_table_name = "fake"
+    result_table_name = "fake2"
     schema = {
       fields {
         name = "string"
@@ -68,6 +68,6 @@ sink {
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error",
-    source_table_name="fake"
+    source_table_name=["fake","fake2"]
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
index 4e345cf96..92e159c2a 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -40,7 +40,7 @@ source {
   }
 
   FakeSource {
-    result_table_name = "fake"
+    result_table_name = "fake2"
     schema = {
       fields {
         name = "string"
@@ -68,6 +68,6 @@ sink {
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error",
-    source_table_name="fake"
+    source_table_name="fake,fake2"
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
new file mode 100644
index 000000000..3ffdc9586
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.parse;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import scala.Tuple2;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.seatunnel.api.common.CommonOptions.FACTORY_ID;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.api.common.CommonOptions.SOURCE_TABLE_NAME;
+import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.DEFAULT_ID;
+
+@Slf4j
+public final class ConfigParserUtil {
+    private ConfigParserUtil() {}
+
+    public static <T extends Factory> Set<URL> getFactoryUrls(
+            ReadonlyConfig readonlyConfig,
+            ClassLoader classLoader,
+            Class<T> factoryClass,
+            String factoryId) {
+        Set<URL> factoryUrls = new HashSet<>();
+        URL factoryUrl =
+                FactoryUtil.getFactoryUrl(
+                        FactoryUtil.discoverFactory(classLoader, factoryClass, 
factoryId));
+        factoryUrls.add(factoryUrl);
+        getCatalogFactoryUrl(readonlyConfig, 
classLoader).ifPresent(factoryUrls::add);
+        return factoryUrls;
+    }
+
+    private static Optional<URL> getCatalogFactoryUrl(
+            ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
+        Map<String, String> catalogOptions =
+                
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new 
HashMap<>());
+        // TODO: fallback key
+        String factoryId =
+                catalogOptions.getOrDefault(FACTORY_ID.key(), 
readonlyConfig.get(PLUGIN_NAME));
+        Optional<CatalogFactory> optionalFactory =
+                FactoryUtil.discoverOptionalFactory(classLoader, 
CatalogFactory.class, factoryId);
+        return optionalFactory.map(FactoryUtil::getFactoryUrl);
+    }
+
+    public static void checkGraph(
+            List<? extends Config> sources,
+            List<? extends Config> transforms,
+            List<? extends Config> sinks) {
+        log.debug("Check whether this config file can generate DAG:");
+        if (CollectionUtils.isEmpty(sources) || 
CollectionUtils.isEmpty(sinks)) {
+            throw new JobDefineCheckException("Source And Sink can not be 
null");
+        }
+        if (isSimpleGraph(sources, transforms, sinks)) {
+            checkSimpleGraph(sources, transforms, sinks);
+            return;
+        }
+        checkComplexGraph(sources, transforms, sinks);
+    }
+
+    private static boolean isSimpleGraph(
+            List<? extends Config> sources,
+            List<? extends Config> transforms,
+            List<? extends Config> sinks) {
+        return sources.size() == 1
+                && sinks.size() == 1
+                && (CollectionUtils.isEmpty(transforms) || transforms.size() 
== 1);
+    }
+
+    private static void checkSimpleGraph(
+            List<? extends Config> sources,
+            List<? extends Config> transforms,
+            List<? extends Config> sinks) {
+        log.debug("This is a simple DAG.");
+        ReadonlyConfig source = ReadonlyConfig.fromConfig(sources.get(0));
+        ReadonlyConfig sink = ReadonlyConfig.fromConfig(sinks.get(0));
+        if (transforms.size() == 0) {
+            checkEdge(source, sink);
+        } else {
+            ReadonlyConfig transform = 
ReadonlyConfig.fromConfig(transforms.get(0));
+            checkEdge(source, transform);
+            checkEdge(transform, sink);
+        }
+    }
+
+    @Deprecated
+    private static void checkEdge(ReadonlyConfig leftConfig, ReadonlyConfig 
rightConfig) {
+        String tableId = getTableId(leftConfig);
+        String inputTableId = getInputIds(rightConfig).get(0);
+        if (tableId.equals(inputTableId)) {
+            return;
+        }
+
+        // Compatible with previous issues
+        log.info(
+                String.format(
+                        "Currently, incorrect configuration of %s and %s 
options don't affect job running. In the future we will ban incorrect 
configurations.",
+                        SOURCE_TABLE_NAME.key(), RESULT_TABLE_NAME.key()));
+        if (DEFAULT_ID.equals(tableId)) {
+            log.warn(
+                    String.format(
+                            "This configuration is not recommended."
+                                    + "A source/transform(%s) is not 
configured with '%s' option, but subsequent transform/sink(%s) is configured 
with '%s' option value of '%s'.",
+                            getFactoryId(leftConfig),
+                            RESULT_TABLE_NAME.key(),
+                            getFactoryId(rightConfig),
+                            SOURCE_TABLE_NAME.key(),
+                            inputTableId));
+            return;
+        }
+        if (DEFAULT_ID.equals(inputTableId)) {
+            log.warn(
+                    String.format(
+                            "This configuration is not recommended."
+                                    + " A source/transform(%s) is configured 
with '%s' option value of '%s', but subsequent transform/sink(%s) is not 
configured with '%s' option.",
+                            getFactoryId(leftConfig),
+                            RESULT_TABLE_NAME.key(),
+                            tableId,
+                            getFactoryId(rightConfig),
+                            SOURCE_TABLE_NAME.key()));
+            return;
+        }
+        log.error(
+                String.format(
+                        "The '%s' option configured in [%s] is incorrect, and 
the source/transform[%s] is not found.",
+                        SOURCE_TABLE_NAME.key(), getFactoryId(rightConfig), 
inputTableId));
+    }
+
+    private static void checkComplexGraph(
+            List<? extends Config> sources,
+            List<? extends Config> transforms,
+            List<? extends Config> sinks) {
+        log.debug("Start checking the correctness of the complex DAG: ");
+        log.debug(
+                String.format(
+                        "Phase 1: Check whether '%s' option is configured.",
+                        RESULT_TABLE_NAME.key()));
+        checkExistTableId(sources);
+        checkExistTableId(transforms);
+        log.debug(
+                String.format(
+                        "Phase 2: Check whether '%s' option is configured.",
+                        SOURCE_TABLE_NAME.key()));
+        checkExistInputTableId(transforms);
+        checkExistInputTableId(sinks);
+
+        log.debug("Phase 3: Generate virtual vertices.");
+        Map<String, Tuple2<Config, VertexStatus>> vertexStatusMap = new 
HashMap<>();
+        fillVirtualVertices(sources, vertexStatusMap);
+        fillVirtualVertices(transforms, vertexStatusMap);
+        log.debug("Phase 4: Check if a non-existent vertex is used.");
+        checkInputId(transforms, vertexStatusMap);
+        checkInputId(sinks, vertexStatusMap);
+        log.debug("Phase 5: Check if there are unused vertex.");
+        checkLinked(vertexStatusMap);
+    }
+
+    private static void fillVirtualVertices(
+            List<? extends Config> configs,
+            Map<String, Tuple2<Config, VertexStatus>> vertexStatusMap) {
+        for (Config config : configs) {
+            vertexStatusMap.compute(
+                    config.getString(RESULT_TABLE_NAME.key()),
+                    (id, old) -> {
+                        if (old != null) {
+                            throw new JobDefineCheckException(
+                                    String.format(
+                                            "The value of the '%s' option of 
the (%s and %s) plugins is both '%s', and they must be different.",
+                                            RESULT_TABLE_NAME.key(),
+                                            
config.getString(PLUGIN_NAME.key()),
+                                            
old._1().getString(PLUGIN_NAME.key()),
+                                            id));
+                        }
+                        return new Tuple2<>(config, VertexStatus.CREATED);
+                    });
+        }
+    }
+
+    private static void checkInputId(
+            List<? extends Config> configs,
+            Map<String, Tuple2<Config, VertexStatus>> vertexStatusMap) {
+        for (Config config : configs) {
+            List<String> inputIds = 
getInputIds(ReadonlyConfig.fromConfig(config));
+            inputIds.forEach(
+                    inputId ->
+                            vertexStatusMap.compute(
+                                    inputId,
+                                    (id, old) -> {
+                                        if (old == null) {
+                                            throw new JobDefineCheckException(
+                                                    String.format(
+                                                            "The '%s' option 
configured in [%s] is incorrect, and the source/transform[%s] is not found.",
+                                                            
SOURCE_TABLE_NAME.key(),
+                                                            
config.getString(PLUGIN_NAME.key()),
+                                                            id));
+                                        }
+                                        return new Tuple2<>(old._1(), 
VertexStatus.LINKED);
+                                    }));
+        }
+    }
+
+    private static void checkLinked(Map<String, Tuple2<Config, VertexStatus>> 
vertexStatusMap) {
+        vertexStatusMap.forEach(
+                (id, vertex) -> {
+                    if (vertex._2() == VertexStatus.CREATED) {
+                        throw new JobDefineCheckException(
+                                String.format(
+                                        "The '%s' option configured is 
incorrect, this table(%s) belonging to source/transform(%s) is not used.",
+                                        SOURCE_TABLE_NAME.key(),
+                                        id,
+                                        
vertex._1().getString(PLUGIN_NAME.key())));
+                    }
+                });
+    }
+
+    private static void checkExistTableId(List<? extends Config> configs) {
+        for (Config config : configs) {
+            if (!config.hasPath(RESULT_TABLE_NAME.key())) {
+                throw new JobDefineCheckException(
+                        String.format(
+                                "The source/transform(%s) is not configured 
with '%s' option",
+                                config.getString(PLUGIN_NAME.key()), 
RESULT_TABLE_NAME.key()),
+                        new OptionValidationException(RESULT_TABLE_NAME));
+            }
+        }
+    }
+
+    private static void checkExistInputTableId(List<? extends Config> configs) 
{
+        for (Config config : configs) {
+            if (!config.hasPath(SOURCE_TABLE_NAME.key())) {
+                throw new JobDefineCheckException(
+                        String.format(
+                                "The transform/sink(%s) is not configured with 
'%s' option",
+                                config.getString(PLUGIN_NAME.key()), 
SOURCE_TABLE_NAME.key()),
+                        new OptionValidationException(SOURCE_TABLE_NAME));
+            }
+        }
+    }
+
+    private static String getTableId(ReadonlyConfig config) {
+        return config.getOptional(RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+    }
+
+    static List<String> getInputIds(ReadonlyConfig config) {
+        return 
config.getOptional(SOURCE_TABLE_NAME).orElse(Collections.singletonList(DEFAULT_ID));
+    }
+
+    public static String getFactoryId(ReadonlyConfig readonlyConfig) {
+        return 
readonlyConfig.getOptional(FACTORY_ID).orElse(readonlyConfig.get(PLUGIN_NAME));
+    }
+
+    private enum VertexStatus {
+        CREATED,
+        LINKED
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
index 32f824c24..820b92c89 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
@@ -21,12 +21,10 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -61,16 +59,6 @@ public class ConnectorInstanceLoader {
 
         SeaTunnelSource seaTunnelSource =
                 sourcePluginDiscovery.createPluginInstance(pluginIdentifier, 
pluginJars);
-        seaTunnelSource.prepare(sourceConfig);
-        seaTunnelSource.setJobContext(jobContext);
-        if (jobContext.getJobMode() == JobMode.BATCH
-                && seaTunnelSource.getBoundedness()
-                        == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "'%s' source don't support off-line job.",
-                            seaTunnelSource.getPluginName()));
-        }
         return new ImmutablePair<>(seaTunnelSource, new 
HashSet<>(pluginJarPaths));
     }
 
@@ -87,13 +75,6 @@ public class ConnectorInstanceLoader {
                 
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
         SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> 
seaTunnelSink =
                 sinkPluginDiscovery.createPluginInstance(pluginIdentifier, 
pluginJars);
-        seaTunnelSink.prepare(sinkConfig);
-        seaTunnelSink.setJobContext(jobContext);
-        if 
(seaTunnelSink.getClass().isAssignableFrom(SupportDataSaveMode.class)) {
-            SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
-            saveModeSink.checkOptions(sinkConfig);
-        }
-
         return new ImmutablePair<>(seaTunnelSink, new 
HashSet<>(pluginJarPaths));
     }
 
@@ -111,8 +92,6 @@ public class ConnectorInstanceLoader {
                 
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
         SeaTunnelTransform<?> seaTunnelTransform =
                 
transformPluginDiscovery.createPluginInstance(pluginIdentifier, pluginJars);
-        seaTunnelTransform.prepare(transformConfig);
-        seaTunnelTransform.setJobContext(jobContext);
         return new ImmutablePair<>(seaTunnelTransform, new 
HashSet<>(pluginJarPaths));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 04dcbe8d0..9b6812763 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -20,496 +20,187 @@ package org.apache.seatunnel.engine.core.parse;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
-import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
-import com.google.common.collect.Lists;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.Data;
 import lombok.NonNull;
 import scala.Serializable;
+import scala.Tuple2;
 
 import java.net.URL;
-import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.DEFAULT_ID;
+import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
+import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.ensureJobModeMatch;
+import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode;
 
 @Data
 public class JobConfigParser {
     private static final ILogger LOGGER = 
Logger.getLogger(JobConfigParser.class);
-    private String jobDefineFilePath;
     private IdGenerator idGenerator;
 
-    private Map<Action, String> alreadyTransformActionMap = new HashMap<>();
-
-    private Map<String, List<Config>> transformResultTableNameMap = new 
HashMap<>();
-    private Map<String, List<Config>> transformSourceTableNameMap = new 
HashMap<>();
-
-    private Map<String, List<Config>> sourceResultTableNameMap = new 
HashMap<>();
-
-    private List<Action> actions = new ArrayList<>();
-    private Set<URL> jarUrlsSet = new HashSet<>();
-
-    private JobConfig jobConfig;
-
-    private Config seaTunnelJobConfig;
-
-    private Config envConfigs;
-
     private List<URL> commonPluginJars;
 
-    public JobConfigParser(
-            @NonNull String jobDefineFilePath,
-            @NonNull IdGenerator idGenerator,
-            @NonNull JobConfig jobConfig) {
-        this(jobDefineFilePath, idGenerator, jobConfig, 
Collections.emptyList());
-    }
-
-    public JobConfigParser(
-            @NonNull String jobDefineFilePath,
-            @NonNull IdGenerator idGenerator,
-            @NonNull JobConfig jobConfig,
-            @NonNull List<URL> commonPluginJars) {
-        this.jobDefineFilePath = jobDefineFilePath;
+    public JobConfigParser(@NonNull IdGenerator idGenerator, @NonNull 
List<URL> commonPluginJars) {
         this.idGenerator = idGenerator;
-        this.jobConfig = jobConfig;
-        this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath));
-        this.envConfigs = seaTunnelJobConfig.getConfig("env");
         this.commonPluginJars = commonPluginJars;
     }
 
-    public ImmutablePair<List<Action>, Set<URL>> parse() {
-        Thread.currentThread()
-                .setContextClassLoader(new SeaTunnelChildFirstClassLoader(new 
ArrayList<>()));
-        List<? extends Config> sinkConfigs = 
seaTunnelJobConfig.getConfigList("sink");
-        List<? extends Config> transformConfigs =
-                TypesafeConfigUtils.getConfigList(
-                        seaTunnelJobConfig, "transform", 
Collections.emptyList());
-        List<? extends Config> sourceConfigs = 
seaTunnelJobConfig.getConfigList("source");
-
-        if (CollectionUtils.isEmpty(sinkConfigs) || 
CollectionUtils.isEmpty(sourceConfigs)) {
-            throw new JobDefineCheckException("Source And Sink can not be 
null");
-        }
-
-        jobConfigAnalyze(envConfigs);
-
-        if (sinkConfigs.size() == 1
-                && sourceConfigs.size() == 1
-                && (CollectionUtils.isEmpty(transformConfigs) || 
transformConfigs.size() == 1)) {
-            sampleAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
-        } else {
-            complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
-        }
-        actions.forEach(this::addCommonPluginJarsToAction);
-        jarUrlsSet.addAll(commonPluginJars);
-        return new ImmutablePair<>(actions, jarUrlsSet);
-    }
-
-    private void addCommonPluginJarsToAction(Action action) {
-        action.getJarUrls().addAll(commonPluginJars);
-        if (!action.getUpstream().isEmpty()) {
-            action.getUpstream().forEach(this::addCommonPluginJarsToAction);
-        }
-    }
-
-    void jobConfigAnalyze(@NonNull Config envConfigs) {
-        if (envConfigs.hasPath(EnvCommonOptions.JOB_MODE.key())) {
-            jobConfig
-                    .getJobContext()
-                    .setJobMode(envConfigs.getEnum(JobMode.class, 
EnvCommonOptions.JOB_MODE.key()));
-        } else {
-            
jobConfig.getJobContext().setJobMode(EnvCommonOptions.JOB_MODE.defaultValue());
-        }
-
-        if (StringUtils.isEmpty(jobConfig.getName())
-                || jobConfig.getName().equals(Constants.LOGO)) {
-            if (envConfigs.hasPath(EnvCommonOptions.JOB_NAME.key())) {
-                
jobConfig.setName(envConfigs.getString(EnvCommonOptions.JOB_NAME.key()));
-            } else {
-                jobConfig.setName(EnvCommonOptions.JOB_NAME.defaultValue());
-            }
-        }
-
-        if (envConfigs.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
-            jobConfig
-                    .getEnvOptions()
-                    .put(
-                            EnvCommonOptions.CHECKPOINT_INTERVAL.key(),
-                            
envConfigs.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
-        }
-    }
-
-    /**
-     * If there are multiple sources or multiple transforms or multiple sink, 
We will rely on
-     * source_table_name and result_table_name to build actions pipeline. So 
in this case
-     * result_table_name is necessary for the Source Connector and all of 
result_table_name and
-     * source_table_name are necessary for Transform Connector. By the end, 
source_table_name is
-     * necessary for Sink Connector.
-     */
-    private void complexAnalyze(
-            List<? extends Config> sourceConfigs,
-            List<? extends Config> transformConfigs,
-            List<? extends Config> sinkConfigs) {
-        initRelationMap(sourceConfigs, transformConfigs);
-
-        for (int configIndex = 0; configIndex < sinkConfigs.size(); 
configIndex++) {
-            Config config = sinkConfigs.get(configIndex);
-            ImmutablePair<
-                            SeaTunnelSink<SeaTunnelRow, Serializable, 
Serializable, Serializable>,
-                            Set<URL>>
-                    sinkListImmutablePair =
-                            ConnectorInstanceLoader.loadSinkInstance(
-                                    config, jobConfig.getJobContext(), 
commonPluginJars);
-
-            String sinkActionName =
-                    createSinkActionName(
-                            configIndex,
-                            sinkListImmutablePair.getLeft().getPluginName(),
-                            getTableName(config));
-            SinkAction sinkAction =
-                    createSinkAction(
-                            idGenerator.getNextId(),
-                            sinkActionName,
-                            sinkListImmutablePair.getLeft(),
-                            sinkListImmutablePair.getRight());
-
-            actions.add(sinkAction);
-            if (!config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
-                throw new JobDefineCheckException(
-                        CommonOptions.SOURCE_TABLE_NAME
-                                + " must be set in the sink plugin config when 
the job have complex dependencies");
-            }
-            String sourceTableName = 
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
-            List<Config> transformConfigList = 
transformResultTableNameMap.get(sourceTableName);
-            SeaTunnelDataType<?> dataType;
-            if (CollectionUtils.isEmpty(transformConfigList)) {
-                dataType = sourceAnalyze(sourceTableName, sinkAction);
-            } else if (transformConfigList.size() > 1) {
-                throw new JobDefineCheckException(
-                        "Only UnionTransform can have more than one upstream, "
-                                + sinkAction.getName()
-                                + " is not UnionTransform Connector");
-            } else {
-                dataType = transformAnalyze(sourceTableName, sinkAction);
-            }
-            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink =
-                    sinkListImmutablePair.getLeft();
-            seaTunnelSink.setTypeInfo((SeaTunnelRowType) dataType);
-            if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
-                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
-                DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
-                saveModeSink.handleSaveMode(dataSaveMode);
-            }
-        }
-    }
-
-    private SeaTunnelDataType sourceAnalyze(String sourceTableName, Action 
action) {
-        List<Config> sourceConfigList = 
sourceResultTableNameMap.get(sourceTableName);
-        if (CollectionUtils.isEmpty(sourceConfigList)) {
-            throw new JobDefineCheckException(
-                    action.getName()
-                            + " source table name ["
-                            + sourceTableName
-                            + "] can not be found");
-        }
-
-        // If a transform have more than one upstream action, the parallelism 
of this transform is
-        // the sum of the parallelism
-        // of its upstream action.
-        SeaTunnelDataType dataType = null;
-        AtomicInteger totalParallelism = new AtomicInteger();
-        for (int configIndex = 0; configIndex < sourceConfigList.size(); 
configIndex++) {
-            Config sourceConfig = sourceConfigList.get(configIndex);
-            ImmutablePair<SeaTunnelSource, Set<URL>> 
seaTunnelSourceListImmutablePair =
-                    ConnectorInstanceLoader.loadSourceInstance(
-                            sourceConfig, jobConfig.getJobContext(), 
commonPluginJars);
-            dataType = 
seaTunnelSourceListImmutablePair.getLeft().getProducedType();
-            String sourceActionName =
-                    createSourceActionName(
-                            configIndex,
-                            
sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
-                            getTableName(sourceConfig));
-            SourceAction sourceAction =
-                    createSourceAction(
-                            idGenerator.getNextId(),
-                            sourceActionName,
-                            seaTunnelSourceListImmutablePair.getLeft(),
-                            seaTunnelSourceListImmutablePair.getRight());
-
-            int sourceParallelism = getSourceParallelism(sourceConfig);
-            sourceAction.setParallelism(sourceParallelism);
-            totalParallelism.set(totalParallelism.get() + sourceParallelism);
-            action.addUpstream(sourceAction);
-            action.setParallelism(totalParallelism.get());
-        }
-        return dataType;
-    }
-
-    private SeaTunnelDataType<?> transformAnalyze(String sourceTableName, 
Action action) {
-        // find upstream transform node
-        List<Config> transformConfigList = 
transformResultTableNameMap.get(sourceTableName);
-        if (CollectionUtils.isEmpty(transformConfigList)) {
-            return sourceAnalyze(sourceTableName, action);
+    public Tuple2<CatalogTable, Action> parseSource(
+            Config config, JobConfig jobConfig, String tableId, int 
parallelism) {
+        ImmutablePair<SeaTunnelSource, Set<URL>> tuple =
+                ConnectorInstanceLoader.loadSourceInstance(
+                        config, jobConfig.getJobContext(), commonPluginJars);
+        final SeaTunnelSource source = tuple.getLeft();
+        // old logic: prepare(initialization) -> set job context
+        source.prepare(config);
+        source.setJobContext(jobConfig.getJobContext());
+        ensureJobModeMatch(jobConfig.getJobContext(), source);
+        String actionName =
+                createSourceActionName(
+                        0, config.getString(CollectionConstants.PLUGIN_NAME), 
getTableName(config));
+        SourceAction action =
+                new SourceAction(
+                        idGenerator.getNextId(), actionName, tuple.getLeft(), 
tuple.getRight());
+        action.setParallelism(parallelism);
+        SeaTunnelRowType producedType = (SeaTunnelRowType) 
tuple.getLeft().getProducedType();
+        CatalogTable catalogTable = CatalogTableUtil.getCatalogTable(tableId, 
producedType);
+        return new Tuple2<>(catalogTable, action);
+    }
+
+    public Tuple2<CatalogTable, Action> parseTransform(
+            Config config,
+            JobConfig jobConfig,
+            String tableId,
+            int parallelism,
+            SeaTunnelRowType rowType,
+            Set<Action> inputActions) {
+        final ImmutablePair<SeaTunnelTransform<?>, Set<URL>> tuple =
+                ConnectorInstanceLoader.loadTransformInstance(
+                        config, jobConfig.getJobContext(), commonPluginJars);
+        final SeaTunnelTransform<?> transform = tuple.getLeft();
+        // old logic: prepare(initialization) -> set job context -> set row 
type (There is a logical
+        // judgment that depends on before and after, not a simple set)
+        transform.prepare(config);
+        transform.setJobContext(jobConfig.getJobContext());
+        transform.setTypeInfo((SeaTunnelDataType) rowType);
+        final String actionName =
+                createTransformActionName(0, tuple.getLeft().getPluginName(), 
getTableName(config));
+        final TransformAction action =
+                new TransformAction(
+                        idGenerator.getNextId(),
+                        actionName,
+                        new ArrayList<>(inputActions),
+                        transform,
+                        tuple.getRight());
+        action.setParallelism(parallelism);
+        CatalogTable catalogTable =
+                CatalogTableUtil.getCatalogTable(
+                        tableId, (SeaTunnelRowType) 
transform.getProducedType());
+        return new Tuple2<>(catalogTable, action);
+    }
+
+    public List<SinkAction<?, ?, ?, ?>> parseSinks(
+            List<List<Tuple2<CatalogTable, Action>>> inputVertices,
+            Config sinkConfig,
+            JobConfig jobConfig) {
+        List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
+        int spareParallelism = 
inputVertices.get(0).get(0)._2().getParallelism();
+        if (inputVertices.size() > 1) {
+            // union
+            Set<Action> inputActions =
+                    inputVertices.stream()
+                            .flatMap(Collection::stream)
+                            .map(Tuple2::_2)
+                            .collect(Collectors.toSet());
+            checkProducedTypeEquals(inputActions);
+            SinkAction<?, ?, ?, ?> sinkAction =
+                    parseSink(
+                            sinkConfig,
+                            jobConfig,
+                            spareParallelism,
+                            inputVertices
+                                    .get(0)
+                                    .get(0)
+                                    ._1()
+                                    .getTableSchema()
+                                    .toPhysicalRowDataType(),
+                            inputActions);
+            sinkActions.add(sinkAction);
         } else {
-            AtomicInteger totalParallelism = new AtomicInteger();
-            SeaTunnelDataType<?> dataTypeResult = null;
-            for (int configIndex = 0; configIndex < 
transformConfigList.size(); configIndex++) {
-                Config config = transformConfigList.get(configIndex);
-                ImmutablePair<SeaTunnelTransform<?>, Set<URL>> 
transformListImmutablePair =
-                        ConnectorInstanceLoader.loadTransformInstance(
-                                config, jobConfig.getJobContext(), 
commonPluginJars);
-                String transformActionName =
-                        createTransformActionName(
-                                configIndex,
-                                
transformListImmutablePair.getLeft().getPluginName(),
-                                getTableName(config));
-                TransformAction transformAction =
-                        createTransformAction(
-                                idGenerator.getNextId(),
-                                transformActionName,
-                                transformListImmutablePair.getLeft(),
-                                transformListImmutablePair.getRight());
-
-                action.addUpstream(transformAction);
-                SeaTunnelDataType dataType =
-                        transformAnalyze(
-                                
config.getString(CommonOptions.SOURCE_TABLE_NAME.key()),
-                                transformAction);
-                transformListImmutablePair.getLeft().setTypeInfo(dataType);
-                dataTypeResult = 
transformListImmutablePair.getLeft().getProducedType();
-                totalParallelism.set(totalParallelism.get() + 
transformAction.getParallelism());
-                action.setParallelism(totalParallelism.get());
+            // sink template
+            for (Tuple2<CatalogTable, Action> tableTuple : 
inputVertices.get(0)) {
+                CatalogTable catalogTable = tableTuple._1();
+                Action inputAction = tableTuple._2();
+                int parallelism = inputAction.getParallelism();
+                SinkAction<?, ?, ?, ?> sinkAction =
+                        parseSink(
+                                sinkConfig,
+                                jobConfig,
+                                parallelism,
+                                
catalogTable.getTableSchema().toPhysicalRowDataType(),
+                                Collections.singleton(inputAction));
+                sinkActions.add(sinkAction);
             }
-            return dataTypeResult;
         }
+        return sinkActions;
     }
 
-    private void initRelationMap(
-            List<? extends Config> sourceConfigs, List<? extends Config> 
transformConfigs) {
-        for (Config config : sourceConfigs) {
-            if (!config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
-                throw new JobDefineCheckException(
-                        CommonOptions.RESULT_TABLE_NAME.key()
-                                + " must be set in the source plugin config 
when the job have complex dependencies");
-            }
-            String resultTableName = 
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
-            sourceResultTableNameMap.computeIfAbsent(resultTableName, k -> new 
ArrayList<>());
-            sourceResultTableNameMap.get(resultTableName).add(config);
-        }
-
-        for (Config config : transformConfigs) {
-            if (!config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
-                throw new JobDefineCheckException(
-                        CommonOptions.RESULT_TABLE_NAME.key()
-                                + " must be set in the transform plugin config 
when the job have complex dependencies");
-            }
-
-            if (!config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
-                throw new JobDefineCheckException(
-                        CommonOptions.SOURCE_TABLE_NAME.key()
-                                + " must be set in the transform plugin config 
when the job have complex dependencies");
-            }
-            String resultTableName = 
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
-            String sourceTableName = 
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
-            if (Objects.equals(sourceTableName, resultTableName)) {
-                throw new JobDefineCheckException(
-                        String.format(
-                                "Source{%s} and result{%s} table name cannot 
be equals",
-                                sourceTableName, resultTableName));
-            }
-
-            transformResultTableNameMap.computeIfAbsent(resultTableName, k -> 
new ArrayList<>());
-            transformResultTableNameMap.get(resultTableName).add(config);
-
-            transformSourceTableNameMap.computeIfAbsent(sourceTableName, k -> 
new ArrayList<>());
-            transformSourceTableNameMap.get(sourceTableName).add(config);
-        }
-    }
-
-    /**
-     * If there is only one Source and one Sink and at most one Transform, We 
simply build actions
-     * pipeline in the following order Source | Transform(If have) | Sink
-     */
-    private void sampleAnalyze(
-            List<? extends Config> sourceConfigs,
-            List<? extends Config> transformConfigs,
-            List<? extends Config> sinkConfigs) {
-        ImmutablePair<SeaTunnelSource, Set<URL>> pair =
-                ConnectorInstanceLoader.loadSourceInstance(
-                        sourceConfigs.get(0), jobConfig.getJobContext(), 
commonPluginJars);
-        String sourceActionName =
-                createSourceActionName(0, pair.getLeft().getPluginName(), 
"default");
-        SourceAction sourceAction =
-                createSourceAction(
-                        idGenerator.getNextId(), sourceActionName, 
pair.getLeft(), pair.getRight());
-        
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
-        SeaTunnelDataType dataType = 
sourceAction.getSource().getProducedType();
-
-        Action sinkUpstreamAction = sourceAction;
-
-        if (!CollectionUtils.isEmpty(transformConfigs)) {
-            ImmutablePair<SeaTunnelTransform<?>, Set<URL>> 
transformListImmutablePair =
-                    ConnectorInstanceLoader.loadTransformInstance(
-                            transformConfigs.get(0), 
jobConfig.getJobContext(), commonPluginJars);
-            transformListImmutablePair.getLeft().setTypeInfo(dataType);
-
-            dataType = transformListImmutablePair.getLeft().getProducedType();
-            String transformActionName =
-                    createTransformActionName(
-                            0, 
transformListImmutablePair.getLeft().getPluginName(), "default");
-            TransformAction transformAction =
-                    createTransformAction(
-                            idGenerator.getNextId(),
-                            transformActionName,
-                            Lists.newArrayList(sourceAction),
-                            transformListImmutablePair.getLeft(),
-                            transformListImmutablePair.getRight());
-
-            initTransformParallelism(
-                    transformConfigs,
-                    sourceAction,
-                    transformListImmutablePair.getLeft(),
-                    transformAction);
-
-            sinkUpstreamAction = transformAction;
-        }
-
-        ImmutablePair<
+    private SinkAction<?, ?, ?, ?> parseSink(
+            Config config,
+            JobConfig jobConfig,
+            int parallelism,
+            SeaTunnelRowType rowType,
+            Set<Action> inputActions) {
+        final ImmutablePair<
                         SeaTunnelSink<SeaTunnelRow, Serializable, 
Serializable, Serializable>,
                         Set<URL>>
-                sinkListImmutablePair =
+                tuple =
                         ConnectorInstanceLoader.loadSinkInstance(
-                                sinkConfigs.get(0), jobConfig.getJobContext(), 
commonPluginJars);
-        String sinkActionName =
-                createSinkActionName(0, 
sinkListImmutablePair.getLeft().getPluginName(), "default");
-        SinkAction sinkAction =
-                createSinkAction(
+                                config, jobConfig.getJobContext(), 
commonPluginJars);
+        final SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> sink =
+                tuple.getLeft();
+        // old logic: prepare(initialization) -> set job context -> set row 
type (There is a logical
+        // judgment that depends on before and after, not a simple set)
+        sink.prepare(config);
+        sink.setJobContext(jobConfig.getJobContext());
+        sink.setTypeInfo(rowType);
+        handleSaveMode(sink);
+        final String actionName =
+                createSinkActionName(0, tuple.getLeft().getPluginName(), 
getTableName(config));
+        final SinkAction action =
+                new SinkAction<>(
                         idGenerator.getNextId(),
-                        sinkActionName,
-                        Lists.newArrayList(sinkUpstreamAction),
-                        sinkListImmutablePair.getLeft(),
-                        sinkListImmutablePair.getRight());
-        SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = sinkAction.getSink();
-        seaTunnelSink.setTypeInfo((SeaTunnelRowType) dataType);
-        sinkAction.setParallelism(sinkUpstreamAction.getParallelism());
-        if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
-            SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
-            DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
-            saveModeSink.handleSaveMode(dataSaveMode);
-        }
-        actions.add(sinkAction);
-    }
-
-    private void initTransformParallelism(
-            List<? extends Config> transformConfigs,
-            Action upstreamAction,
-            SeaTunnelTransform seaTunnelTransform,
-            TransformAction transformAction) {
-        if (seaTunnelTransform instanceof PartitionSeaTunnelTransform
-                && 
transformConfigs.get(0).hasPath(CommonOptions.PARALLELISM.key())) {
-            transformAction.setParallelism(
-                    
transformConfigs.get(0).getInt(CommonOptions.PARALLELISM.key()));
-        } else {
-            // If transform type is not RePartitionTransform, Using the 
parallelism of its upstream
-            // operators.
-            transformAction.setParallelism(upstreamAction.getParallelism());
-        }
-    }
-
-    private int getSourceParallelism(Config sourceConfig) {
-        if (sourceConfig.hasPath(CommonOptions.PARALLELISM.key())) {
-            int sourceParallelism = 
sourceConfig.getInt(CommonOptions.PARALLELISM.key());
-            return Math.max(sourceParallelism, 1);
-        }
-        int executionParallelism = 0;
-        if (envConfigs.hasPath(CommonOptions.PARALLELISM.key())) {
-            executionParallelism = 
envConfigs.getInt(CommonOptions.PARALLELISM.key());
-        }
-        return Math.max(executionParallelism, 1);
-    }
-
-    private SourceAction createSourceAction(
-            long id, @NonNull String name, @NonNull SeaTunnelSource source, 
Set<URL> jarUrls) {
-        if (!CollectionUtils.isEmpty(jarUrls)) {
-            jarUrlsSet.addAll(jarUrls);
-        }
-        return new SourceAction(id, name, source, jarUrls);
-    }
-
-    private TransformAction createTransformAction(
-            long id,
-            @NonNull String name,
-            @NonNull List<Action> upstreams,
-            @NonNull SeaTunnelTransform transformation,
-            Set<URL> jarUrls) {
-        if (!CollectionUtils.isEmpty(jarUrls)) {
-            jarUrlsSet.addAll(jarUrls);
-        }
-        return new TransformAction(id, name, upstreams, transformation, 
jarUrls);
-    }
-
-    private SinkAction createSinkAction(
-            long id,
-            @NonNull String name,
-            @NonNull List<Action> upstreams,
-            @NonNull SeaTunnelSink sink,
-            Set<URL> jarUrls) {
-        if (!CollectionUtils.isEmpty(jarUrls)) {
-            jarUrlsSet.addAll(jarUrls);
-        }
-        return new SinkAction(id, name, upstreams, sink, jarUrls);
-    }
-
-    private TransformAction createTransformAction(
-            long id,
-            @NonNull String name,
-            @NonNull SeaTunnelTransform transformation,
-            Set<URL> jarUrls) {
-        if (!CollectionUtils.isEmpty(jarUrls)) {
-            jarUrlsSet.addAll(jarUrls);
-        }
-        return new TransformAction(id, name, transformation, jarUrls);
-    }
-
-    private SinkAction createSinkAction(
-            long id, @NonNull String name, @NonNull SeaTunnelSink sink, 
Set<URL> jarUrls) {
-        if (!CollectionUtils.isEmpty(jarUrls)) {
-            jarUrlsSet.addAll(jarUrls);
-        }
-        return new SinkAction(id, name, sink, jarUrls);
+                        actionName,
+                        new ArrayList<>(inputActions),
+                        sink,
+                        tuple.getRight());
+        action.setParallelism(parallelism);
+        return action;
     }
 
     static String createSourceActionName(int configIndex, String pluginName, 
String tableName) {
@@ -525,27 +216,17 @@ public class JobConfigParser {
     }
 
     static String getTableName(Config config) {
-        return getTableName(config, "default");
+        return getTableName(config, DEFAULT_ID);
     }
 
     static String getTableName(Config config, String defaultValue) {
-        String sourceTableName = null;
-        if (config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
-            sourceTableName = 
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
-        }
         String resultTableName = null;
         if (config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
             resultTableName = 
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
         }
-        if (sourceTableName != null && resultTableName != null) {
-            return String.format("%s_%s", sourceTableName, resultTableName);
-        }
-        if (sourceTableName == null) {
-            return resultTableName;
-        }
         if (resultTableName == null) {
-            return sourceTableName;
+            return defaultValue;
         }
-        return defaultValue;
+        return resultTableName;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 4c96a20e8..8777a9228 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.core.parse;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.env.ParsingMode;
@@ -27,17 +28,22 @@ import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceOptions;
 import org.apache.seatunnel.api.source.SourceSplit;
-import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -48,6 +54,7 @@ import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -55,6 +62,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import lombok.extern.slf4j.Slf4j;
 import scala.Tuple2;
 
 import java.io.IOException;
@@ -62,18 +70,30 @@ import java.io.Serializable;
 import java.net.URL;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
+import static 
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryUrls;
+import static 
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputIds;
+
+@Slf4j
 public class MultipleTableJobConfigParser {
 
-    private static final ILogger LOGGER = 
Logger.getLogger(JobConfigParser.class);
+    private static final ILogger LOGGER = 
Logger.getLogger(MultipleTableJobConfigParser.class);
+
+    static final String DEFAULT_ID = "default-identifier";
 
     private final IdGenerator idGenerator;
     private final JobConfig jobConfig;
@@ -85,8 +105,6 @@ public class MultipleTableJobConfigParser {
 
     private final JobConfigParser fallbackParser;
 
-    private final ParsingMode parsingMode;
-
     public MultipleTableJobConfigParser(
             String jobDefineFilePath, IdGenerator idGenerator, JobConfig 
jobConfig) {
         this(jobDefineFilePath, idGenerator, jobConfig, 
Collections.emptyList());
@@ -102,15 +120,10 @@ public class MultipleTableJobConfigParser {
         this.commonPluginJars = commonPluginJars;
         this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath));
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
-        this.fallbackParser =
-                new JobConfigParser(jobDefineFilePath, idGenerator, jobConfig, 
commonPluginJars);
-        this.parsingMode = envOptions.get(EnvCommonOptions.DAG_PARSING_MODE);
+        this.fallbackParser = new JobConfigParser(idGenerator, 
commonPluginJars);
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse() {
-        if (parsingMode == ParsingMode.SINGLENESS) {
-            return fallbackParser.parse();
-        }
         List<URL> connectorJars = new ArrayList<>();
         try {
             connectorJars = 
FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
@@ -119,24 +132,35 @@ public class MultipleTableJobConfigParser {
         }
         ClassLoader classLoader = new 
SeaTunnelChildFirstClassLoader(connectorJars);
         Thread.currentThread().setContextClassLoader(classLoader);
-        // TODO: Support configuration transform
-        List<? extends Config> sourceConfigs = 
seaTunnelJobConfig.getConfigList("source");
-        List<? extends Config> sinkConfigs = 
seaTunnelJobConfig.getConfigList("sink");
-        if (CollectionUtils.isEmpty(sourceConfigs) || 
CollectionUtils.isEmpty(sinkConfigs)) {
-            throw new JobDefineCheckException("Source And Sink can not be 
null");
-        }
+        List<? extends Config> sourceConfigs =
+                TypesafeConfigUtils.getConfigList(
+                        seaTunnelJobConfig, "source", Collections.emptyList());
+        List<? extends Config> transformConfigs =
+                TypesafeConfigUtils.getConfigList(
+                        seaTunnelJobConfig, "transform", 
Collections.emptyList());
+        List<? extends Config> sinkConfigs =
+                TypesafeConfigUtils.getConfigList(
+                        seaTunnelJobConfig, "sink", Collections.emptyList());
+
+        ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, 
sinkConfigs);
+
         this.fillJobConfig();
-        Map<String, List<Tuple2<CatalogTable, Action>>> tableWithActionMap = 
new HashMap<>();
+
+        LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap =
+                new LinkedHashMap<>();
+
         for (Config sourceConfig : sourceConfigs) {
             Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
-                    parserSource(sourceConfig, classLoader);
+                    parseSource(sourceConfig, classLoader);
             tableWithActionMap.put(tuple2._1(), tuple2._2());
         }
+
+        parseTransforms(transformConfigs, classLoader, tableWithActionMap);
+
         List<Action> sinkActions = new ArrayList<>();
         for (int configIndex = 0; configIndex < sinkConfigs.size(); 
configIndex++) {
             Config sinkConfig = sinkConfigs.get(configIndex);
-            sinkActions.addAll(
-                    parserSink(configIndex, sinkConfig, classLoader, 
tableWithActionMap));
+            sinkActions.addAll(parseSink(configIndex, sinkConfig, classLoader, 
tableWithActionMap));
         }
         Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
         factoryUrls.addAll(commonPluginJars);
@@ -182,44 +206,86 @@ public class MultipleTableJobConfigParser {
                                         
.put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(), interval));
     }
 
-    public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parserSource(
+    private static <T extends Factory> boolean isFallback(
+            ClassLoader classLoader,
+            Class<T> factoryClass,
+            String factoryId,
+            Consumer<T> virtualCreator) {
+        Optional<T> factory =
+                FactoryUtil.discoverOptionalFactory(classLoader, factoryClass, 
factoryId);
+        if (!factory.isPresent()) {
+            return true;
+        }
+        try {
+            virtualCreator.accept(factory.get());
+        } catch (Exception e) {
+            if (e instanceof UnsupportedOperationException
+                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
+                            .equals(e.getMessage())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private int getParallelism(ReadonlyConfig config) {
+        return Math.max(
+                1,
+                config.getOptional(CommonOptions.PARALLELISM)
+                        .orElse(envOptions.get(CommonOptions.PARALLELISM)));
+    }
+
+    public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource(
             Config sourceConfig, ClassLoader classLoader) {
-        List<CatalogTable> catalogTables =
+        final ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(sourceConfig);
+        final String factoryId = getFactoryId(readonlyConfig);
+        final String tableId =
+                
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+
+        final int parallelism = getParallelism(readonlyConfig);
+
+        boolean fallback =
+                isFallback(
+                        classLoader,
+                        TableSourceFactory.class,
+                        factoryId,
+                        (factory) -> factory.createSource(null));
+        if (fallback) {
+            Tuple2<CatalogTable, Action> tuple =
+                    fallbackParser.parseSource(sourceConfig, jobConfig, 
tableId, parallelism);
+            return new Tuple2<>(tableId, Collections.singletonList(tuple));
+        }
+
+        final List<CatalogTable> catalogTables =
                 CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
         if (catalogTables.isEmpty()) {
             throw new JobDefineCheckException(
                     "The source needs catalog table, please configure 
`catalog` or `schema` options.");
         }
-        ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(sourceConfig);
-        String factoryId = getFactoryId(readonlyConfig);
-        String tableId =
-                
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse("default");
-
-        if (parsingMode == ParsingMode.SHARDING) {
-            catalogTables = Collections.singletonList(catalogTables.get(0));
+        if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) == 
ParsingMode.SHARDING) {
+            CatalogTable shardingTable = catalogTables.get(0);
+            catalogTables.clear();
+            catalogTables.add(shardingTable);
         }
+
         List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>>>
                 sources =
                         FactoryUtil.createAndPrepareSource(
                                 catalogTables, readonlyConfig, classLoader, 
factoryId);
 
-        // get factory urls
-        Set<URL> factoryUrls = new HashSet<>();
-        URL factoryUrl =
-                FactoryUtil.getFactoryUrl(
-                        FactoryUtil.discoverFactory(
-                                classLoader, TableSourceFactory.class, 
factoryId));
-        factoryUrls.add(factoryUrl);
-        getCatalogFactoryUrl(sourceConfig, 
classLoader).ifPresent(factoryUrls::add);
+        Set<URL> factoryUrls =
+                getFactoryUrls(readonlyConfig, classLoader, 
TableSourceFactory.class, factoryId);
 
         List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
-        int parallelism = getParallelism(readonlyConfig);
         for (int configIndex = 0; configIndex < sources.size(); configIndex++) 
{
             Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> tuple2 =
                     sources.get(configIndex);
             long id = idGenerator.getNextId();
             String actionName =
                     JobConfigParser.createSourceActionName(configIndex, 
factoryId, tableId);
+            SeaTunnelSource<Object, SourceSplit, Serializable> source = 
tuple2._1();
+            source.setJobContext(jobConfig.getJobContext());
+            ensureJobModeMatch(jobConfig.getJobContext(), source);
             SourceAction<Object, SourceSplit, Serializable> action =
                     new SourceAction<>(id, actionName, tuple2._1(), 
factoryUrls);
             action.setParallelism(parallelism);
@@ -230,107 +296,266 @@ public class MultipleTableJobConfigParser {
         return new Tuple2<>(tableId, actions);
     }
 
-    public static Optional<URL> getCatalogFactoryUrl(Config config, 
ClassLoader classLoader) {
-        // catalog url
-        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
-        Map<String, String> catalogOptions =
-                
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new 
HashMap<>());
-        // TODO: fallback key
-        String factoryId =
-                catalogOptions.getOrDefault(
-                        CommonOptions.FACTORY_ID.key(),
-                        readonlyConfig.get(CommonOptions.PLUGIN_NAME));
-        Optional<CatalogFactory> optionalFactory =
-                FactoryUtil.discoverOptionalFactory(classLoader, 
CatalogFactory.class, factoryId);
-        return optionalFactory.map(FactoryUtil::getFactoryUrl);
+    public static void ensureJobModeMatch(JobContext jobContext, 
SeaTunnelSource source) {
+        if (jobContext.getJobMode() == JobMode.BATCH
+                && source.getBoundedness()
+                        == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
+            throw new JobDefineCheckException(
+                    String.format(
+                            "'%s' source don't support off-line job.", 
source.getPluginName()));
+        }
     }
 
-    private int getParallelism(ReadonlyConfig config) {
-        return Math.max(
-                1,
-                config.getOptional(CommonOptions.PARALLELISM)
-                        .orElse(envOptions.get(CommonOptions.PARALLELISM)));
+    public void parseTransforms(
+            List<? extends Config> transformConfigs,
+            ClassLoader classLoader,
+            LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap) {
+        if (CollectionUtils.isEmpty(transformConfigs) || 
transformConfigs.size() == 0) {
+            return;
+        }
+        Queue<Config> configList = new LinkedList<>(transformConfigs);
+        parseTransform(configList, classLoader, tableWithActionMap);
+    }
+
+    private void parseTransform(
+            Queue<Config> transforms,
+            ClassLoader classLoader,
+            LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap) {
+        Config config = transforms.poll();
+        final ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(config);
+        final String factoryId = getFactoryId(readonlyConfig);
+        final List<String> inputIds = getInputIds(readonlyConfig);
+
+        List<Tuple2<CatalogTable, Action>> inputs =
+                inputIds.stream()
+                        .map(tableWithActionMap::get)
+                        .filter(Objects::nonNull)
+                        .peek(
+                                input -> {
+                                    if (input.size() > 1) {
+                                        throw new JobDefineCheckException(
+                                                "Adding transform to 
multi-table source is not supported.");
+                                    }
+                                })
+                        .flatMap(Collection::stream)
+                        .collect(Collectors.toList());
+        if (inputs.isEmpty()) {
+            if (transforms.isEmpty()) {
+                // Tolerates incorrect configuration of simple graph
+                inputs = findLast(tableWithActionMap);
+            } else {
+                // The previous transform has not been created
+                transforms.offer(config);
+                return;
+            }
+        }
+
+        final String tableId =
+                
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+
+        boolean fallback =
+                isFallback(
+                        classLoader,
+                        TableTransformFactory.class,
+                        factoryId,
+                        (factory) -> factory.createTransform(null));
+
+        Set<Action> inputActions = 
inputs.stream().map(Tuple2::_2).collect(Collectors.toSet());
+        SeaTunnelDataType<?> expectedType = 
getProducedType(inputs.get(0)._2());
+        checkProducedTypeEquals(inputActions);
+        int spareParallelism = inputs.get(0)._2().getParallelism();
+        if (fallback) {
+            int parallelism =
+                    
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
+            Tuple2<CatalogTable, Action> tuple =
+                    fallbackParser.parseTransform(
+                            config,
+                            jobConfig,
+                            tableId,
+                            parallelism,
+                            (SeaTunnelRowType) expectedType,
+                            inputActions);
+            tableWithActionMap.put(tableId, Collections.singletonList(tuple));
+            return;
+        }
+
+        // TODO: TableTransformFactory is not available.
+        return;
+    }
+
+    public static SeaTunnelDataType<?> getProducedType(Action action) {
+        if (action instanceof SourceAction) {
+            return ((SourceAction<?, ?, ?>) 
action).getSource().getProducedType();
+        } else if (action instanceof TransformAction) {
+            return ((TransformAction) action).getTransform().getProducedType();
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    public static void checkProducedTypeEquals(Set<Action> inputActions) {
+        SeaTunnelDataType<?> expectedType = getProducedType(new 
ArrayList<>(inputActions).get(0));
+        for (Action action : inputActions) {
+            SeaTunnelDataType<?> producedType = getProducedType(action);
+            if (!expectedType.equals(producedType)) {
+                throw new JobDefineCheckException(
+                        "Transform/Sink don't support processing data with two 
different structures.");
+            }
+        }
+    }
+
+    @Deprecated
+    private static <T> T findLast(LinkedHashMap<?, T> map) {
+        int size = map.size();
+        int i = 1;
+        for (T value : map.values()) {
+            if (i == size) {
+                return value;
+            }
+            i++;
+        }
+        // never execution
+        return null;
     }
 
-    public List<SinkAction<?, ?, ?, ?>> parserSink(
+    public List<SinkAction<?, ?, ?, ?>> parseSink(
             int configIndex,
             Config sinkConfig,
             ClassLoader classLoader,
-            Map<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap) {
+            LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap) {
+
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
+        String factoryId = getFactoryId(readonlyConfig);
+        List<String> inputIds = getInputIds(readonlyConfig);
+
+        List<List<Tuple2<CatalogTable, Action>>> inputVertices =
+                inputIds.stream()
+                        .map(tableWithActionMap::get)
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList());
+        if (inputVertices.isEmpty()) {
+            // Tolerates incorrect configuration of simple graph
+            inputVertices = 
Collections.singletonList(findLast(tableWithActionMap));
+        } else if (inputVertices.size() > 1) {
+            for (List<Tuple2<CatalogTable, Action>> inputVertex : 
inputVertices) {
+                if (inputVertex.size() > 1) {
+                    throw new JobDefineCheckException(
+                            "Sink don't support simultaneous writing of data 
from multi-table source and other sources.");
+                }
+            }
+        }
+
+        boolean fallback =
+                isFallback(
+                        classLoader,
+                        TableSinkFactory.class,
+                        factoryId,
+                        (factory) -> factory.createSink(null));
+        if (fallback) {
+            return fallbackParser.parseSinks(inputVertices, sinkConfig, 
jobConfig);
+        }
+
         Map<TablePath, CatalogTable> tableMap =
                 CatalogTableUtil.getCatalogTables(sinkConfig, 
classLoader).stream()
                         .collect(
                                 Collectors.toMap(
                                         catalogTable -> 
catalogTable.getTableId().toTablePath(),
                                         catalogTable -> catalogTable));
-        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
-        String factoryId = getFactoryId(readonlyConfig);
-        String leftTableId =
-                
readonlyConfig.getOptional(CommonOptions.SOURCE_TABLE_NAME).orElse("default");
-        List<Tuple2<CatalogTable, Action>> tableTuples = 
tableWithActionMap.get(leftTableId);
 
         // get factory urls
-        Set<URL> factoryUrls = new HashSet<>();
-        URL factoryUrl =
-                FactoryUtil.getFactoryUrl(
-                        FactoryUtil.discoverFactory(
-                                classLoader, TableSinkFactory.class, 
factoryId));
-        factoryUrls.add(factoryUrl);
-        getCatalogFactoryUrl(sinkConfig, 
classLoader).ifPresent(factoryUrls::add);
-
+        Set<URL> factoryUrls =
+                getFactoryUrls(readonlyConfig, classLoader, 
TableSinkFactory.class, factoryId);
         List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
-        for (Tuple2<CatalogTable, Action> tableTuple : tableTuples) {
-            CatalogTable catalogTable = tableTuple._1();
-            Action leftAction = tableTuple._2();
-            Optional<CatalogTable> insteadTable;
-            if (parsingMode == ParsingMode.SHARDING) {
-                insteadTable = tableMap.values().stream().findFirst();
-            } else {
-                // TODO: another table full name map
-                insteadTable =
-                        
Optional.ofNullable(tableMap.get(catalogTable.getTableId().toTablePath()));
-            }
-            if (insteadTable.isPresent()) {
-                catalogTable = insteadTable.get();
-            }
-            SeaTunnelSink<?, ?, ?, ?> sink =
-                    FactoryUtil.createAndPrepareSink(
-                            catalogTable, readonlyConfig, classLoader, 
factoryId);
-            SinkConfig actionConfig =
-                    new 
SinkConfig(catalogTable.getTableId().toTablePath().toString());
-            long id = idGenerator.getNextId();
-            String actionName =
-                    JobConfigParser.createSinkActionName(
-                            configIndex,
+
+        // union
+        if (inputVertices.size() > 1) {
+            Set<Action> inputActions =
+                    inputVertices.stream()
+                            .flatMap(Collection::stream)
+                            .map(Tuple2::_2)
+                            .collect(Collectors.toSet());
+            checkProducedTypeEquals(inputActions);
+            Tuple2<CatalogTable, Action> inputActionSample = 
inputVertices.get(0).get(0);
+            SinkAction<?, ?, ?, ?> sinkAction =
+                    createSinkAction(
+                            inputActionSample._1(),
+                            tableMap,
+                            inputActions,
+                            readonlyConfig,
+                            classLoader,
+                            factoryUrls,
                             factoryId,
-                            String.format(
-                                    "%s(%s)", leftTableId, 
actionConfig.getMultipleRowTableId()));
+                            inputActionSample._2().getParallelism(),
+                            configIndex);
+            sinkActions.add(sinkAction);
+            return sinkActions;
+        }
+
+        // sink template
+        for (Tuple2<CatalogTable, Action> tuple : inputVertices.get(0)) {
             SinkAction<?, ?, ?, ?> sinkAction =
-                    new SinkAction<>(
-                            id,
-                            actionName,
-                            Collections.singletonList(leftAction),
-                            sink,
+                    createSinkAction(
+                            tuple._1(),
+                            tableMap,
+                            Collections.singleton(tuple._2()),
+                            readonlyConfig,
+                            classLoader,
                             factoryUrls,
-                            actionConfig);
-            handleSaveMode(sink);
-            sinkAction.setParallelism(leftAction.getParallelism());
+                            factoryId,
+                            tuple._2().getParallelism(),
+                            configIndex);
             sinkActions.add(sinkAction);
         }
         return sinkActions;
     }
 
-    private static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
+    private SinkAction<?, ?, ?, ?> createSinkAction(
+            CatalogTable catalogTable,
+            Map<TablePath, CatalogTable> sinkTableMap,
+            Set<Action> inputActions,
+            ReadonlyConfig readonlyConfig,
+            ClassLoader classLoader,
+            Set<URL> factoryUrls,
+            String factoryId,
+            int parallelism,
+            int configIndex) {
+        Optional<CatalogTable> insteadTable;
+        if (sinkTableMap.size() == 1) {
+            insteadTable = sinkTableMap.values().stream().findFirst();
+        } else {
+            // TODO: another table full name map
+            insteadTable =
+                    
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
+        }
+        if (insteadTable.isPresent()) {
+            catalogTable = insteadTable.get();
+        }
+        SeaTunnelSink<?, ?, ?, ?> sink =
+                FactoryUtil.createAndPrepareSink(
+                        catalogTable, readonlyConfig, classLoader, factoryId);
+        SinkConfig actionConfig =
+                new 
SinkConfig(catalogTable.getTableId().toTablePath().toString());
+        long id = idGenerator.getNextId();
+        String actionName =
+                JobConfigParser.createSinkActionName(
+                        configIndex, factoryId, 
actionConfig.getMultipleRowTableId());
+        SinkAction<?, ?, ?, ?> sinkAction =
+                new SinkAction<>(
+                        id,
+                        actionName,
+                        new ArrayList<>(inputActions),
+                        sink,
+                        factoryUrls,
+                        actionConfig);
+        handleSaveMode(sink);
+        sinkAction.setParallelism(parallelism);
+        return sinkAction;
+    }
+
+    public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
         if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
             SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
             DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
             saveModeSink.handleSaveMode(dataSaveMode);
         }
     }
-
-    private static String getFactoryId(ReadonlyConfig readonlyConfig) {
-        return readonlyConfig
-                .getOptional(CommonOptions.FACTORY_ID)
-                .orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index d5f34f3fa..0481256ce 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -34,7 +34,7 @@ import 
org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
-import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -109,7 +109,7 @@ public class TestUtils {
 
         IdGenerator idGenerator = new IdGenerator();
         ImmutablePair<List<Action>, Set<URL>> immutablePair =
-                new JobConfigParser(filePath, idGenerator, jobConfig).parse();
+                new MultipleTableJobConfigParser(filePath, idGenerator, 
jobConfig).parse();
 
         LogicalDagGenerator logicalDagGenerator =
                 new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, 
idGenerator);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
index 657888b14..334518618 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
@@ -40,9 +40,9 @@ import static org.awaitility.Awaitility.await;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 class JobHistoryServiceTest extends AbstractSeaTunnelServerTest {
 
-    private static final Long JOB_1 = 1L;
-    private static final Long JOB_2 = 2L;
-    private static final Long JOB_3 = 3L;
+    private static final Long JOB_1 = System.currentTimeMillis() + 1L;
+    private static final Long JOB_2 = System.currentTimeMillis() + 2L;
+    private static final Long JOB_3 = System.currentTimeMillis() + 3L;
 
     @Test
     public void testlistJobState() throws Exception {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 5e7afd3cf..5b0a75abf 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -50,20 +50,20 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 class JobMetricsTest extends AbstractSeaTunnelServerTest {
 
-    private static final Long JOB_1 = 145234L;
-    private static final Long JOB_2 = 223452L;
-    private static final Long JOB_3 = 323475L;
-
     @Test
     public void testGetJobMetrics() throws Exception {
-        startJob(JOB_1, "fake_to_console_job_metrics.conf", false);
-        startJob(JOB_2, "fake_to_console_job_metrics.conf", false);
+
+        long jobId1 = System.currentTimeMillis() + 145234L;
+        long jobId2 = System.currentTimeMillis() + 223452L;
+
+        startJob(jobId1, "fake_to_console_job_metrics.conf", false);
+        startJob(jobId2, "fake_to_console_job_metrics.conf", false);
 
         await().atMost(60000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () -> {
                             JobMetrics jobMetrics =
-                                    
server.getCoordinatorService().getJobMetrics(JOB_1);
+                                    
server.getCoordinatorService().getJobMetrics(jobId1);
                             if (jobMetrics.get(SINK_WRITE_COUNT).size() > 0) {
                                 assertTrue(
                                         (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value() > 0);
@@ -75,7 +75,7 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
                             }
                         });
 
-        // waiting for JOB_1 status turn to FINISHED
+        // waiting for jobId1 status turn to FINISHED
         await().atMost(60000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () ->
@@ -86,9 +86,9 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
                                                 .contains(
                                                         String.format(
                                                                 
"\"jobId\":%s,\"jobName\":\"Test\",\"jobStatus\":\"FINISHED\"",
-                                                                JOB_1))));
+                                                                jobId1))));
 
-        JobMetrics jobMetrics = 
server.getCoordinatorService().getJobMetrics(JOB_1);
+        JobMetrics jobMetrics = 
server.getCoordinatorService().getJobMetrics(jobId1);
         assertEquals(30, (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
         assertEquals(30, (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
         assertTrue((Double) jobMetrics.get(SOURCE_RECEIVED_QPS).get(0).value() 
> 0);
@@ -98,22 +98,25 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
     @Test
     @SuppressWarnings("checkstyle:RegexpSingleline")
     public void testMetricsOnJobRestart() throws InterruptedException {
+
+        long jobId3 = System.currentTimeMillis() + 323475L;
+
         CoordinatorService coordinatorService = server.getCoordinatorService();
-        startJob(JOB_3, "stream_fake_to_console.conf", false);
+        startJob(jobId3, "stream_fake_to_console.conf", false);
         // waiting for job status turn to running
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () ->
                                 Assertions.assertEquals(
                                         JobStatus.RUNNING,
-                                        
server.getCoordinatorService().getJobStatus(JOB_3)));
+                                        
server.getCoordinatorService().getJobStatus(jobId3)));
 
         Thread.sleep(10000);
 
-        
System.out.println(coordinatorService.getJobMetrics(JOB_3).toJsonString());
+        
System.out.println(coordinatorService.getJobMetrics(jobId3).toJsonString());
 
         // start savePoint
-        coordinatorService.savePoint(JOB_3);
+        coordinatorService.savePoint(jobId3);
 
         // waiting job FINISHED
         await().atMost(120000, TimeUnit.MILLISECONDS)
@@ -121,25 +124,27 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
                         () ->
                                 Assertions.assertEquals(
                                         JobStatus.FINISHED,
-                                        
server.getCoordinatorService().getJobStatus(JOB_3)));
+                                        
server.getCoordinatorService().getJobStatus(jobId3)));
 
         // restore job
-        startJob(JOB_3, "stream_fake_to_console.conf", true);
+        startJob(jobId3, "stream_fake_to_console.conf", true);
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () ->
                                 Assertions.assertEquals(
                                         JobStatus.RUNNING,
-                                        
server.getCoordinatorService().getJobStatus(JOB_3)));
+                                        
server.getCoordinatorService().getJobStatus(jobId3)));
 
         Thread.sleep(20000);
         // check metrics
-        JobMetrics jobMetrics = coordinatorService.getJobMetrics(JOB_3);
+        JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3);
         System.out.println(jobMetrics.toJsonString());
         assertTrue(40 < (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
         assertTrue(40 < (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
         assertTrue(40 < (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
         assertTrue(40 < (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(1).value());
+
+        server.getCoordinatorService().cancelJob(jobId3);
     }
 
     private void startJob(Long jobid, String path, boolean 
isStartWithSavePoint) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
index 6b0599131..9e04be8be 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
@@ -40,7 +40,7 @@ source {
   }
 
   FakeSource {
-    result_table_name = "fake"
+    result_table_name = "fake2"
     parallelism = 1
     schema = {
       fields {
@@ -56,6 +56,6 @@ transform {
 
 sink {
   console {
-    source_table_name="fake"
+    source_table_name = "fake,fake2"
   }
 }
\ No newline at end of file

Reply via email to