This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 79c3de43e [Feature][API] add common options (#3353)
79c3de43e is described below

commit 79c3de43e743ca1c1b6c6aab998159951453b442
Author: Eric <[email protected]>
AuthorDate: Sat Nov 26 15:38:46 2022 +0800

    [Feature][API] add common options (#3353)
    
    * add fullOptionRule method to TableSourceFactory and TableSinkFactory
    
    * add common option rules
    
    * add checkpoint interval parameter
---
 .../apache/seatunnel/api/env/EnvCommonOptions.java | 57 ++++++++++++++++
 .../apache/seatunnel/api/env/EnvOptionRule.java    | 22 ++++---
 .../seatunnel/api/sink/SinkCommonOptions.java      | 40 ++++++++++++
 .../seatunnel/api/source/SourceCommonOptions.java  | 45 +++++++++++++
 .../seatunnel/api/table/factory/FactoryUtil.java   | 19 ++++++
 .../api/table/factory/TableSinkFactory.java        |  3 +-
 .../api/table/factory/TableSourceFactory.java      |  5 +-
 .../api/table/factory/TableTransformFactory.java   |  1 -
 .../api/transform/TransformCommonOptions.java      | 53 +++++++++++++++
 .../common/constants/CollectionConstants.java      |  8 ---
 .../apache/seatunnel/common/constants/JobMode.java |  2 +-
 .../flink/execution/SinkExecuteProcessor.java      |  6 +-
 .../flink/execution/SourceExecuteProcessor.java    |  6 +-
 .../spark/execution/SinkExecuteProcessor.java      | 11 ++--
 .../spark/execution/SourceExecuteProcessor.java    | 11 ++--
 .../seatunnel/command/ClientExecuteCommand.java    |  2 +
 .../engine/e2e/ClusterFaultToleranceIT.java        | 68 +++++++++-----------
 .../seatunnel/engine/e2e/JobExecutionIT.java       |  6 +-
 .../seatunnel/engine/client/SeaTunnelClient.java   |  6 +-
 .../engine/client/SeaTunnelClientInstance.java     |  4 +-
 .../engine/client/job/JobExecutionEnvironment.java |  3 +-
 .../engine/client/SeaTunnelClientTest.java         |  6 +-
 .../engine/core/parse/JobConfigParser.java         | 75 +++++++++++++---------
 .../engine/server/SeaTunnelServerStarter.java      | 10 +--
 .../seatunnel/engine/server/master/JobMaster.java  |  6 +-
 .../common/AbstractSeaTunnelTransform.java         |  6 +-
 .../spark/source/SeaTunnelSourceSupport.java       |  6 +-
 27 files changed, 359 insertions(+), 128 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
new file mode 100644
index 000000000..bc188c416
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.env;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import java.util.Map;
+
+public class EnvCommonOptions {
+    public static final Option<Integer> PARALLELISM =
+        Options.key("parallelism")
+            .intType()
+            .defaultValue(1)
+            .withDescription("When parallelism is not specified in connector, 
the parallelism in env is used by default. " +
+                "When parallelism is specified, it will override the 
parallelism in env.");
+
+    public static final Option<String> JOB_NAME =
+        Options.key("job.name")
+            .stringType()
+            .defaultValue("SeaTunnel Job")
+            .withDescription("The job name of this job");
+
+    public static final Option<JobMode> JOB_MODE =
+        Options.key("job.mode")
+            .enumType(JobMode.class)
+            .defaultValue(JobMode.BATCH)
+            .withDescription("The job mode of this job, support Batch and 
Stream, Default value is Batch");
+
+    public static final Option<Long> CHECKPOINT_INTERVAL =
+        Options.key("checkpoint.interval")
+            .longType()
+            .noDefaultValue()
+            .withDescription("The interval (in milliseconds) between two 
consecutive checkpoints.");
+
+    public static final Option<Map<String, String>> CUSTOM_PARAMETERS =
+        Options.key("custom_parameters")
+            .mapType()
+            .noDefaultValue()
+            .withDescription("custom parameters for run engine");
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
 b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
similarity index 61%
copy from 
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index 83b117de7..cbfd3d455 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.api.env;
 
-import org.apache.seatunnel.engine.client.job.JobClient;
-import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
 
-public interface SeaTunnelClientInstance {
+public class EnvOptionRule {
 
-    JobExecutionEnvironment createExecutionContext(String filePath, JobConfig 
config);
-
-    JobClient createJobClient();
-
-    void close();
+    public static OptionRule getEnvOptionRules() {
+        return OptionRule.builder()
+            .required(EnvCommonOptions.JOB_MODE)
+            .optional(EnvCommonOptions.JOB_NAME,
+                EnvCommonOptions.PARALLELISM,
+                EnvCommonOptions.CHECKPOINT_INTERVAL,
+                EnvCommonOptions.CUSTOM_PARAMETERS)
+            .build();
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
new file mode 100644
index 000000000..61db2e08d
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class SinkCommonOptions {
+
+    public static final Option<String> SOURCE_TABLE_NAME =
+        Options.key("source_table_name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "When source_table_name is not specified, " +
+                    "the current plug-in processes the data set dataset output 
by the previous plugin in the configuration file. " +
+                    "When source_table_name is specified, the current plug-in 
is processing the data set corresponding to this parameter.");
+
+    public static final Option<Integer> PARALLELISM =
+        Options.key("parallelism")
+            .intType()
+            .noDefaultValue()
+            .withDescription("When parallelism is not specified, the 
parallelism in env is used by default. " +
+                "When parallelism is specified, it will override the 
parallelism in env.");
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
new file mode 100644
index 000000000..494d34a9c
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class SourceCommonOptions {
+
+    public static final Option<String> RESULT_TABLE_NAME =
+        Options.key("result_table_name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "When result_table_name is not specified, " +
+                    "the data processed by this plugin will not be registered 
as a data set (dataStream/dataset) " +
+                    "that can be directly accessed by other plugins, or called 
a temporary table (table)" +
+                    "When result_table_name is specified, " +
+                    "the data processed by this plugin will be registered as a 
data set (dataStream/dataset) " +
+                    "that can be directly accessed by other plugins, or called 
a temporary table (table) . " +
+                    "The data set (dataStream/dataset) registered here can be 
directly accessed by other plugins " +
+                    "by specifying source_table_name .");
+
+    public static final Option<Integer> PARALLELISM =
+        Options.key("parallelism")
+            .intType()
+            .noDefaultValue()
+            .withDescription("When parallelism is not specified, the 
parallelism in env is used by default. " +
+                "When parallelism is specified, it will override the 
parallelism in env.");
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 083048c18..89a5deabe 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -17,13 +17,16 @@
 
 package org.apache.seatunnel.api.table.factory;
 
+import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSource;
 
+import lombok.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -155,4 +158,20 @@ public final class FactoryUtil {
             throw new FactoryException("Could not load service provider for 
factories.", e);
         }
     }
+
+    /**
+     * This method is called by SeaTunnel Web to get the full option rule of a 
source.
+     * @return
+     */
+    public static OptionRule sourceFullOptionRule(@NonNull Factory factory) {
+        OptionRule sourceOptionRule = factory.optionRule();
+        if (sourceOptionRule == null) {
+            throw new FactoryException("sourceOptionRule can not be null");
+        }
+
+        OptionRule sourceCommonOptionRule =
+            
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
+        
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
+        return sourceOptionRule;
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 84ea4b857..3d964f66b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 
 /**
  * This is an SPI interface, used to create {@link TableSink}. Each plugin 
need to have it own implementation.
- * todo: now we have not use this interface, we directly use {@link 
org.apache.seatunnel.api.sink.SeaTunnelSink} as the SPI interface.
  *
  * @param <IN>                    row type
  * @param <StateT>                state type
@@ -32,11 +31,11 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT
 
     /**
      * We will never use this method now. So gave a default implement and 
return null.
+     *
      * @param context TableFactoryContext
      * @return
      */
     default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createSink(TableFactoryContext context) {
         throw new UnsupportedOperationException("unsupported now");
     }
-
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 64eeef881..71b6ed413 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -24,15 +24,16 @@ import java.io.Serializable;
 
 /**
  * This is an SPI interface, used to create {@link TableSource}. Each plugin 
need to have it own implementation.
- * todo: now we have not use this interface, we directly use {@link 
org.apache.seatunnel.api.source.SeaTunnelSource} as the SPI interface
  */
 public interface TableSourceFactory extends Factory {
 
     /**
      * We will never use this method now. So gave a default implement and 
return null.
+     *
      * @param context TableFactoryContext
      */
-    default <T, SplitT extends SourceSplit, StateT extends Serializable> 
TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
+    default <T, SplitT extends SourceSplit, StateT extends Serializable> 
TableSource<T, SplitT, StateT> createSource(
+        TableFactoryContext context) {
         throw new UnsupportedOperationException("unsupported now");
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
index afc638ab9..4e6f11ad2 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
@@ -21,7 +21,6 @@ import 
org.apache.seatunnel.api.table.connector.TableTransform;
 
 /**
  * This is an SPI interface, used to create {@link 
org.apache.seatunnel.api.table.connector.TableTransform}. Each plugin need to 
have it own implementation.
- * todo: now we have not use this interface, we directly use {@link 
org.apache.seatunnel.api.transform.SeaTunnelTransform} as the SPI interface
  */
 public interface TableTransformFactory extends Factory {
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
new file mode 100644
index 000000000..ec277d308
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.transform;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class TransformCommonOptions {
+    public static final Option<String> RESULT_TABLE_NAME =
+        Options.key("result_table_name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "When result_table_name is not specified, " +
+                    "the data processed by this plugin will not be registered 
as a data set (dataStream/dataset) " +
+                    "that can be directly accessed by other plugins, or called 
a temporary table (table)" +
+                    "When result_table_name is specified, " +
+                    "the data processed by this plugin will be registered as a 
data set (dataStream/dataset) " +
+                    "that can be directly accessed by other plugins, or called 
a temporary table (table) . " +
+                    "The data set (dataStream/dataset) registered here can be 
directly accessed by other plugins " +
+                    "by specifying source_table_name .");
+
+    public static final Option<String> SOURCE_TABLE_NAME =
+        Options.key("source_table_name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "When source_table_name is not specified, " +
+                    "the current plug-in processes the data set dataset output 
by the previous plugin in the configuration file. " +
+                    "When source_table_name is specified, the current plug-in 
is processing the data set corresponding to this parameter.");
+
+    public static final Option<Integer> PARALLELISM =
+        Options.key("parallelism")
+            .intType()
+            .noDefaultValue()
+            .withDescription("When parallelism is not specified, the 
parallelism in env is used by default. " +
+                "When parallelism is specified, it will override the 
parallelism in env.");
+}
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
index fb387b605..ce263b257 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
@@ -19,21 +19,13 @@ package org.apache.seatunnel.common.constants;
 
 public class CollectionConstants {
 
-    public static final int MAP_SIZE = 6;
-
     public static final String PLUGIN_NAME = "plugin_name";
 
     public static final String SEATUNNEL_PLUGIN = "seatunnel";
 
-    public static final String FLINK_PLUGIN = "flink";
-
-    public static final String SPARK_PLUGIN = "spark";
-
     public static final String SOURCE_PLUGIN = "source";
 
     public static final String TRANSFORM_PLUGIN = "transform";
 
     public static final String SINK_PLUGIN = "sink";
-
-    public static final String PARALLELISM = "parallelism";
 }
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
index a7aa32611..a0df69eda 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
@@ -18,5 +18,5 @@
 package org.apache.seatunnel.common.constants;
 
 public enum JobMode {
-    BATCH, STREAMING, STRUCTURED_STREAMING
+    BATCH, STREAMING
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 97a576b90..d82f07f3c 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -76,8 +76,8 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
             DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
             seaTunnelSink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(stream.getType()));
             DataStreamSink<Row> dataStreamSink = stream.sinkTo(new 
FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
-            if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
-                int parallelism = 
sinkConfig.getInt(CollectionConstants.PARALLELISM);
+            if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
+                int parallelism = 
sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
             }
         }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 60292ad42..76031f3fc 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -21,8 +21,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.source.SupportCoordinate;
-import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -73,8 +73,8 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
                 "SeaTunnel " + internalSource.getClass().getSimpleName(),
                 internalSource.getBoundedness() == 
org.apache.seatunnel.api.source.Boundedness.BOUNDED);
             Config pluginConfig = pluginConfigs.get(i);
-            if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
-                int parallelism = 
pluginConfig.getInt(CollectionConstants.PARALLELISM);
+            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
+                int parallelism = 
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                 sourceStream.setParallelism(parallelism);
             }
             registerResultTable(pluginConfig, sourceStream);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index c5d8276fe..c5f6faf64 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -18,9 +18,10 @@
 package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -73,12 +74,12 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
             Dataset<Row> dataset = fromSourceTable(sinkConfig, 
sparkEnvironment).orElse(input);
             int parallelism;
-            if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
-                parallelism = 
sinkConfig.getInt(CollectionConstants.PARALLELISM);
+            if (sinkConfig.hasPath(SinkCommonOptions.PARALLELISM.key())) {
+                parallelism = 
sinkConfig.getInt(SinkCommonOptions.PARALLELISM.key());
             } else {
-                parallelism = 
sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
+                parallelism = 
sparkEnvironment.getSparkConf().getInt(EnvCommonOptions.PARALLELISM.key(), 
EnvCommonOptions.PARALLELISM.defaultValue());
             }
-            
dataset.sparkSession().read().option(CollectionConstants.PARALLELISM, 
parallelism);
+            
dataset.sparkSession().read().option(SinkCommonOptions.PARALLELISM.key(), 
parallelism);
             // TODO modify checkpoint location
             seaTunnelSink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
             SparkSinkInjector.inject(dataset.write(), 
seaTunnelSink).option("checkpointLocation", "/tmp").save();
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 6f2f7901c..20a5474f5 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -18,9 +18,10 @@
 package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -57,15 +58,15 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
             SeaTunnelSource<?, ?, ?> source = plugins.get(i);
             Config pluginConfig = pluginConfigs.get(i);
             int parallelism;
-            if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
-                parallelism = 
pluginConfig.getInt(CollectionConstants.PARALLELISM);
+            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
+                parallelism = 
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
             } else {
-                parallelism = 
sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
+                parallelism = 
sparkEnvironment.getSparkConf().getInt(EnvCommonOptions.PARALLELISM.key(), 
EnvCommonOptions.PARALLELISM.defaultValue());
             }
             Dataset<Row> dataset = sparkEnvironment.getSparkSession()
                 .read()
                 .format(SeaTunnelSource.class.getSimpleName())
-                .option(CollectionConstants.PARALLELISM, parallelism)
+                .option(SourceCommonOptions.PARALLELISM.key(), parallelism)
                 .option(Constants.SOURCE_SERIALIZATION, 
SerializationUtils.objectToString(source))
                 .schema((StructType) 
TypeConverterUtils.convert(source.getProducedType())).load();
             sources.add(dataset);
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 237bce78a..3c9c8af8d 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -58,12 +58,14 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
     public void execute() throws CommandExecuteException {
         HazelcastInstance instance = null;
         SeaTunnelClient engineClient = null;
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
         try {
             String clusterName = clientCommandArgs.getClusterName();
             if 
(clientCommandArgs.getExecutionMode().equals(ExecutionMode.LOCAL)) {
                 clusterName = creatRandomClusterName(clusterName);
                 instance = createServerInLocal(clusterName);
             }
+            seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
             ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
             clientConfig.setClusterName(clusterName);
             engineClient = new SeaTunnelClient(clientConfig);
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index afd571e66..b55117bde 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
@@ -74,15 +75,15 @@ public class ClusterFaultToleranceIT {
         HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
+
         try {
-            node1 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node2 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
@@ -182,15 +183,14 @@ public class ClusterFaultToleranceIT {
         HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
         try {
-            node1 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node2 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
@@ -263,15 +263,14 @@ public class ClusterFaultToleranceIT {
         HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
         try {
-            node1 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node2 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
@@ -346,15 +345,14 @@ public class ClusterFaultToleranceIT {
         HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
         try {
-            node1 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node2 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
@@ -444,15 +442,14 @@ public class ClusterFaultToleranceIT {
         HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
         try {
-            node1 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node2 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
@@ -527,15 +524,14 @@ public class ClusterFaultToleranceIT {
         HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
         try {
-            node1 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node2 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
-            node3 = SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+            node3 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             // waiting all node added to cluster
             HazelcastInstanceImpl finalNode = node1;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index c483c61b5..f7532a3f9 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -65,7 +65,8 @@ public class JobExecutionIT {
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(filePath, jobConfig);
+        JobExecutionEnvironment jobExecutionEnv =
+            engineClient.createExecutionContext(filePath, jobConfig);
 
         final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
 
@@ -89,7 +90,8 @@ public class JobExecutionIT {
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(filePath, jobConfig);
+        JobExecutionEnvironment jobExecutionEnv =
+            engineClient.createExecutionContext(filePath, jobConfig);
 
         final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
         JobStatus jobStatus1 = clientJobProxy.getJobStatus();
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index bd8d2dfe8..07c7de46d 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -36,7 +36,7 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
     }
 
     @Override
-    public JobExecutionEnvironment createExecutionContext(@NonNull String 
filePath, JobConfig jobConfig) {
+    public JobExecutionEnvironment createExecutionContext(@NonNull String 
filePath, @NonNull JobConfig jobConfig) {
         return new JobExecutionEnvironment(jobConfig, filePath, 
hazelcastClient);
     }
 
@@ -67,14 +67,14 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
         }
     }
 
-    public String getJobState(Long jobId){
+    public String getJobState(Long jobId) {
         return hazelcastClient.requestOnMasterAndDecodeResponse(
             SeaTunnelGetJobStateCodec.encodeRequest(jobId),
             SeaTunnelGetJobStateCodec::decodeResponse
         );
     }
 
-    public String listJobStatus(){
+    public String listJobStatus() {
         return hazelcastClient.requestOnMasterAndDecodeResponse(
             SeaTunnelListJobStatusCodec.encodeRequest(),
             SeaTunnelListJobStatusCodec::decodeResponse
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 83b117de7..3e52d5d16 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -21,9 +21,11 @@ import org.apache.seatunnel.engine.client.job.JobClient;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 
+import lombok.NonNull;
+
 public interface SeaTunnelClientInstance {
 
-    JobExecutionEnvironment createExecutionContext(String filePath, JobConfig 
config);
+    JobExecutionEnvironment createExecutionContext(@NonNull String filePath, 
@NonNull JobConfig config);
 
     JobClient createJobClient();
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 5f9a6ffcd..985b95cc8 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -66,7 +66,8 @@ public class JobExecutionEnvironment {
 
     private final JobClient jobClient;
 
-    public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath,
+    public JobExecutionEnvironment(JobConfig jobConfig,
+                                   String jobFilePath,
                                    SeaTunnelHazelcastClient 
seaTunnelHazelcastClient) {
         this.jobConfig = jobConfig;
         this.jobFilePath = jobFilePath;
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index c5dce270e..f8c118cfe 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -47,13 +47,13 @@ import java.util.concurrent.TimeUnit;
 @DisabledOnOs(OS.WINDOWS)
 public class SeaTunnelClientTest {
 
+    private static SeaTunnelConfig SEATUNNEL_CONFIG = 
ConfigProvider.locateAndGetSeaTunnelConfig();
     private static HazelcastInstance INSTANCE;
 
     @BeforeAll
     public static void beforeClass() throws Exception {
-        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
-        
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
-        INSTANCE = 
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+        
SEATUNNEL_CONFIG.getHazelcastConfig().setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
+        INSTANCE = 
HazelcastInstanceFactory.newHazelcastInstance(SEATUNNEL_CONFIG.getHazelcastConfig(),
             Thread.currentThread().getName(),
             new 
SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 6859171e8..b4f27f9c4 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -17,20 +17,22 @@
 
 package org.apache.seatunnel.engine.core.parse;
 
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.api.transform.TransformCommonOptions;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.config.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -106,7 +108,8 @@ public class JobConfigParser {
 
     public ImmutablePair<List<Action>, Set<URL>> parse() {
         List<? extends Config> sinkConfigs = 
seaTunnelJobConfig.getConfigList("sink");
-        List<? extends Config> transformConfigs = 
TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, "transform", 
Collections.emptyList());
+        List<? extends Config> transformConfigs =
+            TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, "transform", 
Collections.emptyList());
         List<? extends Config> sourceConfigs = 
seaTunnelJobConfig.getConfigList("source");
 
         if (CollectionUtils.isEmpty(sinkConfigs) || 
CollectionUtils.isEmpty(sourceConfigs)) {
@@ -135,13 +138,22 @@ public class JobConfigParser {
     }
 
     private void jobConfigAnalyze(@NonNull Config envConfigs) {
-        if (envConfigs.hasPath("job.mode")) {
-            
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class, 
"job.mode"));
+        if (envConfigs.hasPath(EnvCommonOptions.JOB_MODE.key())) {
+            
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class, 
EnvCommonOptions.JOB_MODE.key()));
         } else {
-            jobConfig.getJobContext().setJobMode(JobMode.BATCH);
+            
jobConfig.getJobContext().setJobMode(EnvCommonOptions.JOB_MODE.defaultValue());
         }
-        if (envConfigs.hasPath("checkpoint.interval")) {
-            
jobConfig.getEnvOptions().put(ServerConfigOptions.CHECKPOINT_INTERVAL.key(), 
envConfigs.getInt("checkpoint.interval"));
+
+        if (envConfigs.hasPath(EnvCommonOptions.JOB_NAME.key())) {
+            
jobConfig.setName(envConfigs.getString(EnvCommonOptions.JOB_NAME.key()));
+        } else {
+            jobConfig.setName(EnvCommonOptions.JOB_NAME.defaultValue());
+        }
+
+        if (envConfigs.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+            jobConfig.getEnvOptions()
+                .put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(),
+                    
envConfigs.getInt(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
         }
     }
 
@@ -167,11 +179,11 @@ public class JobConfigParser {
                     sinkListImmutablePair.getLeft(), 
sinkListImmutablePair.getRight());
 
             actions.add(sinkAction);
-            if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
-                throw new JobDefineCheckException(Plugin.SOURCE_TABLE_NAME
+            if (!config.hasPath(SinkCommonOptions.SOURCE_TABLE_NAME.key())) {
+                throw new 
JobDefineCheckException(SinkCommonOptions.SOURCE_TABLE_NAME
                     + " must be set in the sink plugin config when the job 
have complex dependencies");
             }
-            String sourceTableName = 
config.getString(Plugin.SOURCE_TABLE_NAME);
+            String sourceTableName = 
config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key());
             List<Config> transformConfigList = 
transformResultTableNameMap.get(sourceTableName);
             SeaTunnelDataType<?> dataType;
             if (CollectionUtils.isEmpty(transformConfigList)) {
@@ -235,8 +247,9 @@ public class JobConfigParser {
                     transformListImmutablePair.getRight());
 
                 action.addUpstream(transformAction);
-                SeaTunnelDataType dataType = 
transformAnalyze(config.getString(Plugin.SOURCE_TABLE_NAME),
-                    transformAction);
+                SeaTunnelDataType dataType =
+                    
transformAnalyze(config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key()),
+                        transformAction);
                 transformListImmutablePair.getLeft().setTypeInfo(dataType);
                 dataTypeResult = 
transformListImmutablePair.getLeft().getProducedType();
                 totalParallelism.set(totalParallelism.get() + 
transformAction.getParallelism());
@@ -248,27 +261,27 @@ public class JobConfigParser {
 
     private void initRelationMap(List<? extends Config> sourceConfigs, List<? 
extends Config> transformConfigs) {
         for (Config config : sourceConfigs) {
-            if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
-                throw new JobDefineCheckException(Plugin.RESULT_TABLE_NAME
+            if (!config.hasPath(SourceCommonOptions.RESULT_TABLE_NAME.key())) {
+                throw new 
JobDefineCheckException(SourceCommonOptions.RESULT_TABLE_NAME.key()
                     + " must be set in the source plugin config when the job 
have complex dependencies");
             }
-            String resultTableName = 
config.getString(Plugin.RESULT_TABLE_NAME);
+            String resultTableName = 
config.getString(SourceCommonOptions.RESULT_TABLE_NAME.key());
             sourceResultTableNameMap.computeIfAbsent(resultTableName, k -> new 
ArrayList<>());
             sourceResultTableNameMap.get(resultTableName).add(config);
         }
 
         for (Config config : transformConfigs) {
-            if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
-                throw new JobDefineCheckException(Plugin.RESULT_TABLE_NAME
+            if (!config.hasPath(SourceCommonOptions.RESULT_TABLE_NAME.key())) {
+                throw new 
JobDefineCheckException(SourceCommonOptions.RESULT_TABLE_NAME.key()
                     + " must be set in the transform plugin config when the 
job have complex dependencies");
             }
 
-            if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
-                throw new JobDefineCheckException(Plugin.SOURCE_TABLE_NAME
+            if (!config.hasPath(SinkCommonOptions.SOURCE_TABLE_NAME.key())) {
+                throw new 
JobDefineCheckException(SinkCommonOptions.SOURCE_TABLE_NAME.key()
                     + " must be set in the transform plugin config when the 
job have complex dependencies");
             }
-            String resultTableName = 
config.getString(Plugin.RESULT_TABLE_NAME);
-            String sourceTableName = 
config.getString(Plugin.SOURCE_TABLE_NAME);
+            String resultTableName = 
config.getString(SourceCommonOptions.RESULT_TABLE_NAME.key());
+            String sourceTableName = 
config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key());
             if (Objects.equals(sourceTableName, resultTableName)) {
                 throw new JobDefineCheckException(String.format(
                     "Source{%s} and result{%s} table name cannot be equals", 
sourceTableName, resultTableName));
@@ -295,7 +308,8 @@ public class JobConfigParser {
                                List<? extends Config> transformConfigs,
                                List<? extends Config> sinkConfigs) {
         ImmutablePair<SeaTunnelSource, Set<URL>> pair =
-            ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0), 
jobConfig.getJobContext(), commonPluginJars);
+            ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0), 
jobConfig.getJobContext(),
+                commonPluginJars);
         SourceAction sourceAction =
             createSourceAction(idGenerator.getNextId(), 
pair.getLeft().getPluginName(), pair.getLeft(),
                 pair.getRight());
@@ -306,7 +320,8 @@ public class JobConfigParser {
 
         if (!CollectionUtils.isEmpty(transformConfigs)) {
             ImmutablePair<SeaTunnelTransform<?>, Set<URL>> 
transformListImmutablePair =
-                
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0), 
jobConfig.getJobContext(), commonPluginJars);
+                
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0), 
jobConfig.getJobContext(),
+                    commonPluginJars);
             transformListImmutablePair.getLeft().setTypeInfo(dataType);
 
             dataType = transformListImmutablePair.getLeft().getProducedType();
@@ -341,10 +356,10 @@ public class JobConfigParser {
     private void initTransformParallelism(List<? extends Config> 
transformConfigs, Action upstreamAction,
                                           SeaTunnelTransform 
seaTunnelTransform, TransformAction transformAction) {
         if (seaTunnelTransform instanceof PartitionSeaTunnelTransform
-            && 
transformConfigs.get(0).hasPath(CollectionConstants.PARALLELISM)) {
+            && 
transformConfigs.get(0).hasPath(TransformCommonOptions.PARALLELISM.key())) {
             transformAction.setParallelism(transformConfigs
                 .get(0)
-                .getInt(CollectionConstants.PARALLELISM));
+                .getInt(TransformCommonOptions.PARALLELISM.key()));
         } else {
             // If transform type is not RePartitionTransform, Using the 
parallelism of its upstream operators.
             transformAction.setParallelism(upstreamAction.getParallelism());
@@ -352,13 +367,13 @@ public class JobConfigParser {
     }
 
     private int getSourceParallelism(Config sourceConfig) {
-        if (sourceConfig.hasPath(CollectionConstants.PARALLELISM)) {
-            int sourceParallelism = 
sourceConfig.getInt(CollectionConstants.PARALLELISM);
+        if (sourceConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
+            int sourceParallelism = 
sourceConfig.getInt(SourceCommonOptions.PARALLELISM.key());
             return Math.max(sourceParallelism, 1);
         }
         int executionParallelism = 0;
-        if (envConfigs.hasPath(CollectionConstants.PARALLELISM)) {
-            executionParallelism = 
envConfigs.getInt(CollectionConstants.PARALLELISM);
+        if (envConfigs.hasPath(EnvCommonOptions.PARALLELISM.key())) {
+            executionParallelism = 
envConfigs.getInt(EnvCommonOptions.PARALLELISM.key());
         }
         return Math.max(executionParallelism, 1);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 5a9b49507..757e489fe 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import com.hazelcast.instance.impl.HazelcastInstanceProxy;
+import lombok.NonNull;
 
 public class SeaTunnelServerStarter {
 
@@ -33,6 +34,10 @@ public class SeaTunnelServerStarter {
     public static HazelcastInstanceImpl createHazelcastInstance(String 
clusterName) {
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
         seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+        return createHazelcastInstance(seaTunnelConfig);
+    }
+
+    public static HazelcastInstanceImpl createHazelcastInstance(@NonNull 
SeaTunnelConfig seaTunnelConfig) {
         return ((HazelcastInstanceProxy) 
HazelcastInstanceFactory.newHazelcastInstance(
             seaTunnelConfig.getHazelcastConfig(),
             
HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
@@ -41,9 +46,6 @@ public class SeaTunnelServerStarter {
 
     public static HazelcastInstanceImpl createHazelcastInstance() {
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
-        return ((HazelcastInstanceProxy) 
HazelcastInstanceFactory.newHazelcastInstance(
-            seaTunnelConfig.getHazelcastConfig(),
-            
HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
-            new SeaTunnelNodeContext(seaTunnelConfig))).getOriginal();
+        return createHazelcastInstance(seaTunnelConfig);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 76cca50ad..3a2a8ca48 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,13 +19,13 @@ package org.apache.seatunnel.engine.server.master;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
-import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import 
org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -163,8 +163,8 @@ public class JobMaster extends Thread {
     // TODO replace it after ReadableConfig Support parse yaml format, then 
use only one config to read engine and env config.
     private CheckpointConfig mergeEnvAndEngineConfig(CheckpointConfig engine, 
Map<String, Object> env) {
         CheckpointConfig checkpointConfig = new CheckpointConfig();
-        if (env.containsKey(ServerConfigOptions.CHECKPOINT_INTERVAL.key())) {
-            checkpointConfig.setCheckpointInterval((Integer) 
env.get(ServerConfigOptions.CHECKPOINT_INTERVAL.key()));
+        if (env.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+            checkpointConfig.setCheckpointInterval((Integer) 
env.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
         }
         checkpointConfig.setCheckpointTimeout(engine.getCheckpointTimeout());
         
checkpointConfig.setTolerableFailureCheckpoints(engine.getTolerableFailureCheckpoints());
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index df0c543f3..d0de07266 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.transform.common;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -29,8 +31,8 @@ import java.util.Objects;
 
 public abstract class AbstractSeaTunnelTransform implements 
SeaTunnelTransform<SeaTunnelRow> {
 
-    private static final String RESULT_TABLE_NAME = "result_table_name";
-    private static final String SOURCE_TABLE_NAME = "source_table_name";
+    private static final String RESULT_TABLE_NAME = 
SourceCommonOptions.RESULT_TABLE_NAME.key();
+    private static final String SOURCE_TABLE_NAME = 
SinkCommonOptions.SOURCE_TABLE_NAME.key();
 
     private String inputTableName;
     private SeaTunnelRowType inputRowType;
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 69b22a625..4e945d671 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -18,9 +18,9 @@
 package org.apache.seatunnel.translation.spark.source;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
 import 
org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
@@ -60,14 +60,14 @@ public class SeaTunnelSourceSupport implements 
DataSourceV2, ReadSupport, MicroB
     @Override
     public DataSourceReader createReader(DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = 
getSeaTunnelSource(options);
-        int parallelism = options.getInt(CollectionConstants.PARALLELISM, 1);
+        int parallelism = 
options.getInt(SourceCommonOptions.PARALLELISM.key(), 1);
         return new BatchSourceReader(seaTunnelSource, parallelism);
     }
 
     @Override
     public MicroBatchReader createMicroBatchReader(Optional<StructType> 
rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = 
getSeaTunnelSource(options);
-        Integer parallelism = options.getInt(CollectionConstants.PARALLELISM, 
1);
+        Integer parallelism = 
options.getInt(SourceCommonOptions.PARALLELISM.key(), 1);
         Integer checkpointInterval = 
options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
         String checkpointPath = StringUtils.replacePattern(checkpointLocation, 
"sources/\\d+", "sources-state");
         Configuration configuration = 
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();

Reply via email to