This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new c407ec5c2 [improve][api] add multi-table parser (#4175)
c407ec5c2 is described below

commit c407ec5c26c5a95d42e4e4f15484a01c5234ef54
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Feb 21 10:41:56 2023 +0800

    [improve][api] add multi-table parser (#4175)
---
 .../apache/seatunnel/api/common/CommonOptions.java |  30 ++++
 .../apache/seatunnel/api/env/EnvCommonOptions.java |   6 -
 .../apache/seatunnel/api/env/EnvOptionRule.java    |   3 +-
 .../seatunnel/api/sink/SinkCommonOptions.java      |  19 ---
 .../seatunnel/api/source/SourceCommonOptions.java  |  45 -----
 .../api/table/catalog/CatalogTableUtil.java        |   2 +-
 .../seatunnel/api/table/factory/FactoryUtil.java   |  18 +-
 .../api/transform/TransformCommonOptions.java      |  53 ------
 .../flink/execution/SinkExecuteProcessor.java      |   6 +-
 .../flink/execution/SourceExecuteProcessor.java    |   6 +-
 .../flink/execution/SinkExecuteProcessor.java      |   6 +-
 .../flink/execution/SourceExecuteProcessor.java    |   6 +-
 .../spark/execution/SinkExecuteProcessor.java      |  11 +-
 .../spark/execution/SourceExecuteProcessor.java    |  11 +-
 .../spark/execution/SinkExecuteProcessor.java      |  11 +-
 .../spark/execution/SourceExecuteProcessor.java    |  11 +-
 .../engine/core/parse/JobConfigParser.java         |  44 +++--
 .../core/parse/MultipleTableJobConfigParser.java   | 186 +++++++++++++++++++++
 .../common/AbstractSeaTunnelTransform.java         |   7 +-
 .../spark/source/SeaTunnelSourceSupport.java       |   6 +-
 .../spark/source/SeaTunnelSourceTable.java         |   5 +-
 21 files changed, 290 insertions(+), 202 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
index 18f46873f..66785e511 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
@@ -32,4 +32,34 @@ public interface CommonOptions {
             .stringType()
             .noDefaultValue()
             .withDescription("Name of the SPI plugin class.");
+
+    Option<String> RESULT_TABLE_NAME =
+        Options.key("result_table_name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "When result_table_name is not specified, " +
+                    "the data processed by this plugin will not be registered 
as a data set (dataStream/dataset) " +
+                    "that can be directly accessed by other plugins, or called 
a temporary table (table)" +
+                    "When result_table_name is specified, " +
+                    "the data processed by this plugin will be registered as a 
data set (dataStream/dataset) " +
+                    "that can be directly accessed by other plugins, or called 
a temporary table (table) . " +
+                    "The data set (dataStream/dataset) registered here can be 
directly accessed by other plugins " +
+                    "by specifying source_table_name .");
+
+    Option<String> SOURCE_TABLE_NAME =
+        Options.key("source_table_name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "When source_table_name is not specified, " +
+                    "the current plug-in processes the data set dataset output 
by the previous plugin in the configuration file. " +
+                    "When source_table_name is specified, the current plug-in 
is processing the data set corresponding to this parameter.");
+
+    Option<Integer> PARALLELISM =
+        Options.key("parallelism")
+            .intType()
+            .defaultValue(1)
+            .withDescription("When parallelism is not specified, the 
parallelism in env is used by default. " +
+                "When parallelism is specified, it will override the 
parallelism in env.");
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index c8b398c6f..fa454684a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -24,12 +24,6 @@ import org.apache.seatunnel.common.constants.JobMode;
 import java.util.Map;
 
 public class EnvCommonOptions {
-    public static final Option<Integer> PARALLELISM =
-        Options.key("parallelism")
-            .intType()
-            .defaultValue(1)
-            .withDescription("When parallelism is not specified in connector, 
the parallelism in env is used by default. " +
-                "When parallelism is specified, it will override the 
parallelism in env.");
 
     public static final Option<String> JOB_NAME =
         Options.key("job.name")
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index bc40247b3..ad93970bf 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.env;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 
 public class EnvOptionRule {
@@ -25,7 +26,7 @@ public class EnvOptionRule {
         return OptionRule.builder()
             .required(EnvCommonOptions.JOB_MODE)
             .optional(EnvCommonOptions.JOB_NAME,
-                EnvCommonOptions.PARALLELISM,
+                CommonOptions.PARALLELISM,
                 EnvCommonOptions.JARS,
                 EnvCommonOptions.CHECKPOINT_INTERVAL,
                 EnvCommonOptions.CUSTOM_PARAMETERS)
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
index cdab3566e..4bf320b49 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
@@ -17,26 +17,7 @@
 
 package org.apache.seatunnel.api.sink;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
 public class SinkCommonOptions {
 
     public static final String DATA_SAVE_MODE = "save_mode";
-
-    public static final Option<String> SOURCE_TABLE_NAME =
-        Options.key("source_table_name")
-            .stringType()
-            .noDefaultValue()
-            .withDescription(
-                "When source_table_name is not specified, " +
-                    "the current plug-in processes the data set dataset output 
by the previous plugin in the configuration file. " +
-                    "When source_table_name is specified, the current plug-in 
is processing the data set corresponding to this parameter.");
-
-    public static final Option<Integer> PARALLELISM =
-        Options.key("parallelism")
-            .intType()
-            .defaultValue(1)
-            .withDescription("When parallelism is not specified, the 
parallelism in env is used by default. " +
-                "When parallelism is specified, it will override the 
parallelism in env.");
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
deleted file mode 100644
index 6a54d88cf..000000000
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.source;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class SourceCommonOptions {
-
-    public static final Option<String> RESULT_TABLE_NAME =
-        Options.key("result_table_name")
-            .stringType()
-            .noDefaultValue()
-            .withDescription(
-                "When result_table_name is not specified, " +
-                    "the data processed by this plugin will not be registered 
as a data set (dataStream/dataset) " +
-                    "that can be directly accessed by other plugins, or called 
a temporary table (table)" +
-                    "When result_table_name is specified, " +
-                    "the data processed by this plugin will be registered as a 
data set (dataStream/dataset) " +
-                    "that can be directly accessed by other plugins, or called 
a temporary table (table) . " +
-                    "The data set (dataStream/dataset) registered here can be 
directly accessed by other plugins " +
-                    "by specifying source_table_name .");
-
-    public static final Option<Integer> PARALLELISM =
-        Options.key("parallelism")
-            .intType()
-            .defaultValue(1)
-            .withDescription("When parallelism is not specified, the 
parallelism in env is used by default. " +
-                "When parallelism is specified, it will override the 
parallelism in env.");
-}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index d4ed86d4e..344cda1f0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -69,7 +69,7 @@ public class CatalogTableUtil implements Serializable {
         this.catalogTable = catalogTable;
     }
 
-    private static List<CatalogTable> getCatalogTables(Config config, 
ClassLoader classLoader) {
+    public static List<CatalogTable> getCatalogTables(Config config, 
ClassLoader classLoader) {
         ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
         Map<String, String> catalogOptions = 
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new 
HashMap<>());
         // TODO: fallback key
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 5da8f3c48..17a4c4fd9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.table.factory;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -26,7 +27,6 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -47,6 +47,8 @@ import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+
 /**
  * Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and 
{@link CatalogFactory}.
  */
@@ -54,7 +56,7 @@ public final class FactoryUtil {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtil.class);
 
-    public static <T, SplitT extends SourceSplit, StateT extends Serializable> 
List<SeaTunnelSource<T, SplitT, StateT>> createAndPrepareSource(
+    public static <T, SplitT extends SourceSplit, StateT extends Serializable> 
List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>>> 
createAndPrepareSource(
         List<CatalogTable> multipleTables,
         ReadonlyConfig options,
         ClassLoader classLoader,
@@ -62,20 +64,20 @@ public final class FactoryUtil {
 
         try {
             final TableSourceFactory factory = discoverFactory(classLoader, 
TableSourceFactory.class, factoryIdentifier);
-            List<SeaTunnelSource<T, SplitT, StateT>> sources = new 
ArrayList<>(multipleTables.size());
+            List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, 
List<CatalogTable>>> sources = new ArrayList<>(multipleTables.size());
             if (factory instanceof SupportMultipleTable) {
                 List<CatalogTable> remainingTables = multipleTables;
                 while (!remainingTables.isEmpty()) {
                     TableFactoryContext context = new 
TableFactoryContext(remainingTables, options, classLoader);
                     SupportMultipleTable.Result result = 
((SupportMultipleTable) factory).applyTables(context);
                     List<CatalogTable> acceptedTables = 
result.getAcceptedTables();
-                    sources.add(createAndPrepareSource(factory, 
acceptedTables, options, classLoader));
+                    sources.add(new Tuple2<>(createAndPrepareSource(factory, 
acceptedTables, options, classLoader), acceptedTables));
                     remainingTables = result.getRemainingTables();
                 }
             } else {
                 for (CatalogTable catalogTable : multipleTables) {
                     List<CatalogTable> acceptedTables = 
Collections.singletonList(catalogTable);
-                    sources.add(createAndPrepareSource(factory, 
acceptedTables, options, classLoader));
+                    sources.add(new Tuple2<>(createAndPrepareSource(factory, 
acceptedTables, options, classLoader), acceptedTables));
                 }
             }
             return sources;
@@ -87,7 +89,7 @@ public final class FactoryUtil {
         }
     }
 
-    public static <T, SplitT extends SourceSplit, StateT extends Serializable> 
SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
+    private static <T, SplitT extends SourceSplit, StateT extends 
Serializable> SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
         TableSourceFactory factory,
         List<CatalogTable> acceptedTables,
         ReadonlyConfig options,
@@ -104,8 +106,8 @@ public final class FactoryUtil {
 
     public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createAndPrepareSink(
         CatalogTable catalogTable,
-        ClassLoader classLoader,
         ReadonlyConfig options,
+        ClassLoader classLoader,
         String factoryIdentifier) {
         try {
             TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
factory = discoverFactory(classLoader, TableSinkFactory.class, 
factoryIdentifier);
@@ -208,7 +210,7 @@ public final class FactoryUtil {
             // TODO: Implement SupportParallelism in the TableSourceFactory 
instead of the SeaTunnelSource
             || SupportParallelism.class.isAssignableFrom(sourceClass)) {
             OptionRule sourceCommonOptionRule =
-                
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
+                
OptionRule.builder().optional(CommonOptions.PARALLELISM).build();
             
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
         }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
deleted file mode 100644
index ec277d308..000000000
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.transform;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class TransformCommonOptions {
-    public static final Option<String> RESULT_TABLE_NAME =
-        Options.key("result_table_name")
-            .stringType()
-            .noDefaultValue()
-            .withDescription(
-                "When result_table_name is not specified, " +
-                    "the data processed by this plugin will not be registered 
as a data set (dataStream/dataset) " +
-                    "that can be directly accessed by other plugins, or called 
a temporary table (table)" +
-                    "When result_table_name is specified, " +
-                    "the data processed by this plugin will be registered as a 
data set (dataStream/dataset) " +
-                    "that can be directly accessed by other plugins, or called 
a temporary table (table) . " +
-                    "The data set (dataStream/dataset) registered here can be 
directly accessed by other plugins " +
-                    "by specifying source_table_name .");
-
-    public static final Option<String> SOURCE_TABLE_NAME =
-        Options.key("source_table_name")
-            .stringType()
-            .noDefaultValue()
-            .withDescription(
-                "When source_table_name is not specified, " +
-                    "the current plug-in processes the data set dataset output 
by the previous plugin in the configuration file. " +
-                    "When source_table_name is specified, the current plug-in 
is processing the data set corresponding to this parameter.");
-
-    public static final Option<Integer> PARALLELISM =
-        Options.key("parallelism")
-            .intType()
-            .noDefaultValue()
-            .withDescription("When parallelism is not specified, the 
parallelism in env is used by default. " +
-                "When parallelism is specified, it will override the 
parallelism in env.");
-}
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 6ca5a541c..00c0bb0b2 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -88,8 +88,8 @@ public class SinkExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
             DataStreamSink<Row> dataStreamSink = stream.sinkTo(new 
FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
-            if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
-                int parallelism = 
sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+            if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+                int parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
             }
         }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 8dc555ade..90a995f6e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -73,8 +73,8 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
                 "SeaTunnel " + internalSource.getClass().getSimpleName(),
                 internalSource.getBoundedness() == 
org.apache.seatunnel.api.source.Boundedness.BOUNDED);
             Config pluginConfig = pluginConfigs.get(i);
-            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
-                int parallelism = 
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+            if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+                int parallelism = 
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
                 sourceStream.setParallelism(parallelism);
             }
             registerResultTable(pluginConfig, sourceStream);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 001f95db7..af328a976 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -90,8 +90,8 @@ public class SinkExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor
             }
             DataStreamSink<Row> dataStreamSink = 
stream.sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(seaTunnelSink)))
                     .name(seaTunnelSink.getPluginName());
-            if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
-                int parallelism = 
sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+            if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+                int parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
             }
         }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 8dc555ade..90a995f6e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -73,8 +73,8 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
                 "SeaTunnel " + internalSource.getClass().getSimpleName(),
                 internalSource.getBoundedness() == 
org.apache.seatunnel.api.source.Boundedness.BOUNDED);
             Config pluginConfig = pluginConfigs.get(i);
-            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
-                int parallelism = 
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+            if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+                int parallelism = 
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
                 sourceStream.setParallelism(parallelism);
             }
             registerResultTable(pluginConfig, sourceStream);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index fb7b25e14..af74dea4e 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,11 +17,10 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -82,13 +81,13 @@ public class SinkExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<Se
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
             Dataset<Row> dataset = fromSourceTable(sinkConfig, 
sparkRuntimeEnvironment).orElse(input);
             int parallelism;
-            if (sinkConfig.hasPath(SinkCommonOptions.PARALLELISM.key())) {
-                parallelism = 
sinkConfig.getInt(SinkCommonOptions.PARALLELISM.key());
+            if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+                parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
             } else {
                 parallelism = sparkRuntimeEnvironment.getSparkConf()
-                    .getInt(EnvCommonOptions.PARALLELISM.key(), 
EnvCommonOptions.PARALLELISM.defaultValue());
+                    .getInt(CommonOptions.PARALLELISM.key(), 
CommonOptions.PARALLELISM.defaultValue());
             }
-            
dataset.sparkSession().read().option(SinkCommonOptions.PARALLELISM.key(), 
parallelism);
+            
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), 
parallelism);
             // TODO modify checkpoint location
             seaTunnelSink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
             if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 4ca60dada..c4af663ed 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,10 +17,9 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -56,16 +55,16 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
             SeaTunnelSource<?, ?, ?> source = plugins.get(i);
             Config pluginConfig = pluginConfigs.get(i);
             int parallelism;
-            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
-                parallelism = 
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+            if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+                parallelism = 
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
             } else {
                 parallelism = sparkRuntimeEnvironment.getSparkConf()
-                        .getInt(EnvCommonOptions.PARALLELISM.key(), 
EnvCommonOptions.PARALLELISM.defaultValue());
+                        .getInt(CommonOptions.PARALLELISM.key(), 
CommonOptions.PARALLELISM.defaultValue());
             }
             Dataset<Row> dataset = sparkRuntimeEnvironment.getSparkSession()
                 .read()
                 .format(SeaTunnelSource.class.getSimpleName())
-                .option(SourceCommonOptions.PARALLELISM.key(), parallelism)
+                .option(CommonOptions.PARALLELISM.key(), parallelism)
                 .option(Constants.SOURCE_SERIALIZATION, 
SerializationUtils.objectToString(source))
                 .schema((StructType) 
TypeConverterUtils.convert(source.getProducedType())).load();
             sources.add(dataset);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 854b14936..e5c3d6e34 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,11 +17,10 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -83,13 +82,13 @@ public class SinkExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<Se
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
             Dataset<Row> dataset = fromSourceTable(sinkConfig, 
sparkRuntimeEnvironment).orElse(input);
             int parallelism;
-            if (sinkConfig.hasPath(SinkCommonOptions.PARALLELISM.key())) {
-                parallelism = 
sinkConfig.getInt(SinkCommonOptions.PARALLELISM.key());
+            if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+                parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
             } else {
                 parallelism = sparkRuntimeEnvironment.getSparkConf()
-                    .getInt(EnvCommonOptions.PARALLELISM.key(), 
EnvCommonOptions.PARALLELISM.defaultValue());
+                    .getInt(CommonOptions.PARALLELISM.key(), 
CommonOptions.PARALLELISM.defaultValue());
             }
-            
dataset.sparkSession().read().option(SinkCommonOptions.PARALLELISM.key(), 
parallelism);
+            
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), 
parallelism);
             // TODO modify checkpoint location
             seaTunnelSink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
             if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 4ca60dada..c4af663ed 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,10 +17,9 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -56,16 +55,16 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
             SeaTunnelSource<?, ?, ?> source = plugins.get(i);
             Config pluginConfig = pluginConfigs.get(i);
             int parallelism;
-            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
-                parallelism = 
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+            if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+                parallelism = 
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
             } else {
                 parallelism = sparkRuntimeEnvironment.getSparkConf()
-                        .getInt(EnvCommonOptions.PARALLELISM.key(), 
EnvCommonOptions.PARALLELISM.defaultValue());
+                        .getInt(CommonOptions.PARALLELISM.key(), 
CommonOptions.PARALLELISM.defaultValue());
             }
             Dataset<Row> dataset = sparkRuntimeEnvironment.getSparkSession()
                 .read()
                 .format(SeaTunnelSource.class.getSimpleName())
-                .option(SourceCommonOptions.PARALLELISM.key(), parallelism)
+                .option(CommonOptions.PARALLELISM.key(), parallelism)
                 .option(Constants.SOURCE_SERIALIZATION, 
SerializationUtils.objectToString(source))
                 .schema((StructType) 
TypeConverterUtils.convert(source.getProducedType())).load();
             sources.add(dataset);
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index a322bd374..f1ffca0a1 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -17,19 +17,17 @@
 
 package org.apache.seatunnel.engine.core.parse;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.api.transform.TransformCommonOptions;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -142,7 +140,7 @@ public class JobConfigParser {
         }
     }
 
-    private void jobConfigAnalyze(@NonNull Config envConfigs) {
+    void jobConfigAnalyze(@NonNull Config envConfigs) {
         if (envConfigs.hasPath(EnvCommonOptions.JOB_MODE.key())) {
             
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class, 
EnvCommonOptions.JOB_MODE.key()));
         } else {
@@ -186,11 +184,11 @@ public class JobConfigParser {
                     sinkListImmutablePair.getLeft(), 
sinkListImmutablePair.getRight());
 
             actions.add(sinkAction);
-            if (!config.hasPath(SinkCommonOptions.SOURCE_TABLE_NAME.key())) {
-                throw new 
JobDefineCheckException(SinkCommonOptions.SOURCE_TABLE_NAME
+            if (!config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
+                throw new 
JobDefineCheckException(CommonOptions.SOURCE_TABLE_NAME
                     + " must be set in the sink plugin config when the job 
have complex dependencies");
             }
-            String sourceTableName = 
config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key());
+            String sourceTableName = 
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
             List<Config> transformConfigList = 
transformResultTableNameMap.get(sourceTableName);
             SeaTunnelDataType<?> dataType;
             if (CollectionUtils.isEmpty(transformConfigList)) {
@@ -261,7 +259,7 @@ public class JobConfigParser {
 
                 action.addUpstream(transformAction);
                 SeaTunnelDataType dataType =
-                    
transformAnalyze(config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key()),
+                    
transformAnalyze(config.getString(CommonOptions.SOURCE_TABLE_NAME.key()),
                         transformAction);
                 transformListImmutablePair.getLeft().setTypeInfo(dataType);
                 dataTypeResult = 
transformListImmutablePair.getLeft().getProducedType();
@@ -274,27 +272,27 @@ public class JobConfigParser {
 
     private void initRelationMap(List<? extends Config> sourceConfigs, List<? 
extends Config> transformConfigs) {
         for (Config config : sourceConfigs) {
-            if (!config.hasPath(SourceCommonOptions.RESULT_TABLE_NAME.key())) {
-                throw new 
JobDefineCheckException(SourceCommonOptions.RESULT_TABLE_NAME.key()
+            if (!config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
+                throw new 
JobDefineCheckException(CommonOptions.RESULT_TABLE_NAME.key()
                     + " must be set in the source plugin config when the job 
have complex dependencies");
             }
-            String resultTableName = 
config.getString(SourceCommonOptions.RESULT_TABLE_NAME.key());
+            String resultTableName = 
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
             sourceResultTableNameMap.computeIfAbsent(resultTableName, k -> new 
ArrayList<>());
             sourceResultTableNameMap.get(resultTableName).add(config);
         }
 
         for (Config config : transformConfigs) {
-            if (!config.hasPath(SourceCommonOptions.RESULT_TABLE_NAME.key())) {
-                throw new 
JobDefineCheckException(SourceCommonOptions.RESULT_TABLE_NAME.key()
+            if (!config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
+                throw new 
JobDefineCheckException(CommonOptions.RESULT_TABLE_NAME.key()
                     + " must be set in the transform plugin config when the 
job have complex dependencies");
             }
 
-            if (!config.hasPath(SinkCommonOptions.SOURCE_TABLE_NAME.key())) {
-                throw new 
JobDefineCheckException(SinkCommonOptions.SOURCE_TABLE_NAME.key()
+            if (!config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
+                throw new 
JobDefineCheckException(CommonOptions.SOURCE_TABLE_NAME.key()
                     + " must be set in the transform plugin config when the 
job have complex dependencies");
             }
-            String resultTableName = 
config.getString(SourceCommonOptions.RESULT_TABLE_NAME.key());
-            String sourceTableName = 
config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key());
+            String resultTableName = 
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
+            String sourceTableName = 
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
             if (Objects.equals(sourceTableName, resultTableName)) {
                 throw new JobDefineCheckException(String.format(
                     "Source{%s} and result{%s} table name cannot be equals", 
sourceTableName, resultTableName));
@@ -375,10 +373,10 @@ public class JobConfigParser {
     private void initTransformParallelism(List<? extends Config> 
transformConfigs, Action upstreamAction,
                                           SeaTunnelTransform 
seaTunnelTransform, TransformAction transformAction) {
         if (seaTunnelTransform instanceof PartitionSeaTunnelTransform
-            && 
transformConfigs.get(0).hasPath(TransformCommonOptions.PARALLELISM.key())) {
+            && 
transformConfigs.get(0).hasPath(CommonOptions.PARALLELISM.key())) {
             transformAction.setParallelism(transformConfigs
                 .get(0)
-                .getInt(TransformCommonOptions.PARALLELISM.key()));
+                .getInt(CommonOptions.PARALLELISM.key()));
         } else {
             // If transform type is not RePartitionTransform, Using the 
parallelism of its upstream operators.
             transformAction.setParallelism(upstreamAction.getParallelism());
@@ -386,13 +384,13 @@ public class JobConfigParser {
     }
 
     private int getSourceParallelism(Config sourceConfig) {
-        if (sourceConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
-            int sourceParallelism = 
sourceConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+        if (sourceConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+            int sourceParallelism = 
sourceConfig.getInt(CommonOptions.PARALLELISM.key());
             return Math.max(sourceParallelism, 1);
         }
         int executionParallelism = 0;
-        if (envConfigs.hasPath(EnvCommonOptions.PARALLELISM.key())) {
-            executionParallelism = 
envConfigs.getInt(EnvCommonOptions.PARALLELISM.key());
+        if (envConfigs.hasPath(CommonOptions.PARALLELISM.key())) {
+            executionParallelism = 
envConfigs.getInt(CommonOptions.PARALLELISM.key());
         }
         return Math.max(executionParallelism, 1);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
new file mode 100644
index 000000000..8902b90a6
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.parse;
+
+import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
+import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+public class MultipleTableJobConfigParser {
+
+    private static final ILogger LOGGER = 
Logger.getLogger(JobConfigParser.class);
+    private final String jobDefineFilePath;
+    private final IdGenerator idGenerator;
+    private final JobConfig jobConfig;
+
+    private final List<URL> commonFactoryJars;
+    private final Config seaTunnelJobConfig;
+
+    private final ReadonlyConfig envOptions;
+
+    private final Map<String, List<Tuple2<CatalogTable, Action>>> graph;
+
+    private final Set<URL> jarUrls;
+
+    public MultipleTableJobConfigParser(String jobDefineFilePath,
+                                        IdGenerator idGenerator,
+                                        JobConfig jobConfig) {
+        this(jobDefineFilePath,
+            idGenerator,
+            jobConfig,
+            Collections.emptyList());
+    }
+
+    public MultipleTableJobConfigParser(String jobDefineFilePath,
+                                        IdGenerator idGenerator,
+                                        JobConfig jobConfig,
+                                        List<URL> commonPluginJars) {
+        this.jobDefineFilePath = jobDefineFilePath;
+        this.idGenerator = idGenerator;
+        this.jobConfig = jobConfig;
+        this.commonFactoryJars = commonPluginJars;
+        this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath));
+        this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
+        this.graph = new HashMap<>();
+        this.jarUrls = new HashSet<>();
+    }
+
+    public ImmutablePair<List<Action>, Set<URL>> parse() {
+        ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(new 
ArrayList<>());
+        Thread.currentThread().setContextClassLoader(classLoader);
+        // TODO: Support configuration transform
+        List<? extends Config> sourceConfigs = 
seaTunnelJobConfig.getConfigList("source");
+        List<? extends Config> sinkConfigs = 
seaTunnelJobConfig.getConfigList("sink");
+        if (CollectionUtils.isEmpty(sourceConfigs) || 
CollectionUtils.isEmpty(sinkConfigs)) {
+            throw new JobDefineCheckException("Source And Sink can not be 
null");
+        }
+        this.fillJobConfig();
+        for (Config sourceConfig : sourceConfigs) {
+            parserSource(sourceConfig, classLoader);
+        }
+        List<Action> sinkActions = new ArrayList<>();
+        for (Config sinkConfig : sinkConfigs) {
+            sinkActions.addAll(parserSink(sinkConfig, classLoader));
+        }
+        return new ImmutablePair<>(sinkActions, null);
+    }
+
+    private void fillJobConfig() {
+        
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
+        if (StringUtils.isEmpty(jobConfig.getName())) {
+            jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
+        }
+        envOptions.getOptional(EnvCommonOptions.CHECKPOINT_INTERVAL)
+            .ifPresent(interval -> jobConfig.getEnvOptions()
+                .put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(), interval));
+    }
+
+    public void parserSource(Config sourceConfig, ClassLoader classLoader) {
+        List<CatalogTable> catalogTables = 
CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
+        ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(sourceConfig);
+        String factoryId = 
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+        String tableId = 
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse("default");
+
+        List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>>> sources =
+            FactoryUtil.createAndPrepareSource(catalogTables, readonlyConfig, 
classLoader, factoryId);
+        // TODO: get factory jar
+        Set<URL> factoryUrls = new HashSet<>();
+        List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
+        int parallelism = getParallelism(readonlyConfig);
+        for (Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> tuple2 : sources) {
+            long id = idGenerator.getNextId();
+            SourceAction<Object, SourceSplit, Serializable> action = new 
SourceAction<>(id, factoryId, tuple2._1(), factoryUrls);
+            action.setParallelism(parallelism);
+            for (CatalogTable catalogTable : tuple2._2()) {
+                actions.add(new Tuple2<>(catalogTable, action));
+            }
+        }
+        graph.put(tableId, actions);
+    }
+
+    private int getParallelism(ReadonlyConfig config) {
+        return Math.max(1,
+            config.getOptional(CommonOptions.PARALLELISM)
+                .orElse(envOptions.get(CommonOptions.PARALLELISM)));
+    }
+
+    public List<SinkAction<?, ?, ?, ?>> parserSink(Config sinkConfig, 
ClassLoader classLoader) {
+        Map<TablePath, CatalogTable> tableMap = 
CatalogTableUtil.getCatalogTables(sinkConfig, classLoader)
+            .stream()
+            .collect(Collectors.toMap(catalogTable -> 
catalogTable.getTableId().toTablePath(), catalogTable -> catalogTable));
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
+        String factoryId = 
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+        String leftTableId = 
readonlyConfig.getOptional(CommonOptions.SOURCE_TABLE_NAME).orElse("default");
+        List<Tuple2<CatalogTable, Action>> tableTuples = 
graph.get(leftTableId);
+        // TODO: get factory jar
+        Set<URL> factoryUrls = new HashSet<>();
+        List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
+        for (Tuple2<CatalogTable, Action> tableTuple : tableTuples) {
+            CatalogTable catalogTable = tableTuple._1();
+            Action leftAction = tableTuple._2();
+            // TODO: another table full name map
+            CatalogTable insteadTable = 
tableMap.get(catalogTable.getTableId().toTablePath());
+            if (insteadTable != null) {
+                catalogTable = insteadTable;
+            }
+            SeaTunnelSink<?, ?, ?, ?> sink = 
FactoryUtil.createAndPrepareSink(catalogTable, readonlyConfig, classLoader, 
factoryId);
+            long id = idGenerator.getNextId();
+            SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>(id, 
factoryId, Collections.singletonList(leftAction), sink, factoryUrls);
+            sinkAction.setParallelism(leftAction.getParallelism());
+            sinkActions.add(sinkAction);
+        }
+        return sinkActions;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index d0de07266..b6183aeaa 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -17,9 +17,8 @@
 
 package org.apache.seatunnel.transform.common;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -31,8 +30,8 @@ import java.util.Objects;
 
 public abstract class AbstractSeaTunnelTransform implements 
SeaTunnelTransform<SeaTunnelRow> {
 
-    private static final String RESULT_TABLE_NAME = 
SourceCommonOptions.RESULT_TABLE_NAME.key();
-    private static final String SOURCE_TABLE_NAME = 
SinkCommonOptions.SOURCE_TABLE_NAME.key();
+    private static final String RESULT_TABLE_NAME = 
CommonOptions.RESULT_TABLE_NAME.key();
+    private static final String SOURCE_TABLE_NAME = 
CommonOptions.SOURCE_TABLE_NAME.key();
 
     private String inputTableName;
     private SeaTunnelRowType inputRowType;
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index ef9ae092b..cb9cb75d0 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.translation.spark.source;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -60,14 +60,14 @@ public class SeaTunnelSourceSupport implements 
DataSourceV2, ReadSupport, MicroB
     @Override
     public DataSourceReader createReader(DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = 
getSeaTunnelSource(options);
-        int parallelism = 
options.getInt(SourceCommonOptions.PARALLELISM.key(), 1);
+        int parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 1);
         return new BatchSourceReader(seaTunnelSource, parallelism);
     }
 
     @Override
     public MicroBatchReader createMicroBatchReader(Optional<StructType> 
rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = 
getSeaTunnelSource(options);
-        Integer parallelism = 
options.getInt(SourceCommonOptions.PARALLELISM.key(), 1);
+        Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 
1);
         Integer checkpointInterval = 
options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
         String checkpointPath = StringUtils.replacePattern(checkpointLocation, 
"sources/\\d+", "sources-state");
         Configuration configuration = 
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceTable.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceTable.java
index 17395b368..d7cc7da09 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceTable.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceTable.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.translation.spark.source;
 
-import static org.apache.seatunnel.api.source.SourceCommonOptions.PARALLELISM;
-
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.Constants;
@@ -65,7 +64,7 @@ public class SeaTunnelSourceTable implements Table, 
SupportsRead {
      */
     @Override
     public ScanBuilder newScanBuilder(CaseInsensitiveStringMap 
caseInsensitiveStringMap) {
-        int parallelism = 
Integer.parseInt(properties.getOrDefault(PARALLELISM.key(), "1"));
+        int parallelism = 
Integer.parseInt(properties.getOrDefault(CommonOptions.PARALLELISM.key(), "1"));
         return new SeaTunnelScanBuilder(source, parallelism, 
caseInsensitiveStringMap);
     }
 

Reply via email to