This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new e8929ab60 [DEV][Api] Replace SeaTunnelContext with JobContext and 
remove singleton pattern (#2706) (#2731)
e8929ab60 is described below

commit e8929ab605abb2eb0194321bfbfcbdf7e881e423
Author: ic4y <[email protected]>
AuthorDate: Thu Sep 15 09:55:04 2022 +0800

    [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton 
pattern (#2706) (#2731)
---
 .../{SeaTunnelContext.java => JobContext.java}     | 16 +++++-----------
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   |  4 ++--
 ...nelContextAware.java => SeaTunnelJobAware.java} |  8 ++++----
 .../seatunnel/api/source/SeaTunnelSource.java      |  2 +-
 .../api/transform/SeaTunnelTransform.java          |  4 ++--
 .../seatunnel/assertion/sink/AssertSink.java       |  7 -------
 .../clickhouse/sink/client/ClickhouseSink.java     |  6 ------
 .../clickhouse/sink/file/ClickhouseFileSink.java   |  7 -------
 .../seatunnel/fake/source/FakeSource.java          | 10 +++++-----
 .../seatunnel/file/sink/AbstractFileSink.java      | 12 ++++++------
 .../connectors/seatunnel/hive/sink/HiveSink.java   | 12 ++++++------
 .../seatunnel/hive/source/HiveSource.java          |  8 --------
 .../seatunnel/http/source/HttpSource.java          | 10 +++++-----
 .../seatunnel/hudi/source/HudiSource.java          |  8 --------
 .../jdbc/internal/xa/SemanticXidGenerator.java     |  6 +++---
 .../seatunnel/jdbc/internal/xa/XaGroupOps.java     |  4 ++--
 .../seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java |  4 ++--
 .../seatunnel/jdbc/internal/xa/XidGenerator.java   |  6 +++---
 .../jdbc/sink/JdbcExactlyOnceSinkWriter.java       |  6 +++---
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   | 12 ++++++------
 .../seatunnel/jdbc/source/JdbcSource.java          |  7 -------
 .../seatunnel/kafka/source/KafkaSource.java        | 10 +++++-----
 .../seatunnel/socket/source/SocketSource.java      | 10 +++++-----
 .../execution/AbstractPluginExecuteProcessor.java  |  4 ++++
 .../starter/flink/execution/FlinkExecution.java    | 11 ++++++-----
 .../flink/execution/SinkExecuteProcessor.java      |  7 ++++---
 .../flink/execution/SourceExecuteProcessor.java    |  9 +++++----
 .../flink/execution/TransformExecuteProcessor.java |  4 +++-
 .../execution/AbstractPluginExecuteProcessor.java  |  4 ++++
 .../spark/execution/SinkExecuteProcessor.java      |  7 ++++---
 .../spark/execution/SourceExecuteProcessor.java    |  7 ++++---
 .../starter/spark/execution/SparkExecution.java    | 11 ++++++-----
 .../spark/execution/TransformExecuteProcessor.java |  4 +++-
 .../engine/client/job/ConnectorInstanceLoader.java | 16 ++++++++--------
 .../engine/client/job/JobConfigParser.java         | 22 +++++++++++-----------
 .../seatunnel/engine/common/config/JobConfig.java  |  8 ++++----
 .../apache/seatunnel/engine/server/TestUtils.java  |  8 ++++----
 .../server/checkpoint/CheckpointPlanTest.java      |  9 +++++----
 .../seatunnel/engine/server/dag/TaskTest.java      |  7 ++++---
 .../engine/server/master/JobMasterTest.java        |  7 ++++---
 40 files changed, 148 insertions(+), 176 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
 b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
similarity index 84%
rename from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
index b6152e44f..9e56de36a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
@@ -28,14 +28,12 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * This class is used to store the context of the application. e.g. the table 
schema, catalog...etc.
+ * This class is used to store the context of the job. e.g. the table schema, 
catalog...etc.
  */
-public final class SeaTunnelContext implements Serializable {
+public final class JobContext implements Serializable {
 
     private static final long serialVersionUID = -1L;
 
-    private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
-
     // tableName -> tableSchema
     private final Map<String, TableSchema> tableSchemaMap = new 
ConcurrentHashMap<>(Common.COLLECTION_SIZE);
 
@@ -43,8 +41,8 @@ public final class SeaTunnelContext implements Serializable {
 
     private final String jobId;
 
-    public static SeaTunnelContext getContext() {
-        return INSTANCE;
+    public JobContext() {
+        this.jobId = UUID.randomUUID().toString().replace("-", "");
     }
 
     /**
@@ -67,7 +65,7 @@ public final class SeaTunnelContext implements Serializable {
         return Optional.ofNullable(tableSchemaMap.get(tableName));
     }
 
-    public SeaTunnelContext setJobMode(JobMode jobMode) {
+    public JobContext setJobMode(JobMode jobMode) {
         this.jobMode = jobMode;
         return this;
     }
@@ -80,8 +78,4 @@ public final class SeaTunnelContext implements Serializable {
         return this.jobId;
     }
 
-    private SeaTunnelContext() {
-        this.jobId = UUID.randomUUID().toString().replace("-", "");
-    }
-
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 59517e409..e09d8110a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.sink;
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.source.SeaTunnelContextAware;
+import org.apache.seatunnel.api.source.SeaTunnelJobAware;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
@@ -43,7 +43,7 @@ import java.util.Optional;
  *                                {@link SinkAggregatedCommitter} handle it, 
this class should implement interface {@link Serializable}.
  */
 public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
-    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, 
SeaTunnelContextAware {
+    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, 
SeaTunnelJobAware {
 
     /**
      * Set the row type info of sink row data. This method will be 
automatically called by translation.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
similarity index 83%
rename from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
index 429f05155..8eb90b04f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
@@ -17,14 +17,14 @@
 
 package org.apache.seatunnel.api.source;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 
 /**
- * This interface defines the runtime environment of the SeaTunnel application.
+ * This interface defines the runtime environment of the SeaTunnel job.
  */
-public interface SeaTunnelContextAware {
+public interface SeaTunnelJobAware {
 
-    default void setSeaTunnelContext(SeaTunnelContext seaTunnelContext){
+    default void setJobContext(JobContext jobContext){
         // nothing
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index f93f4d3bf..6edac04a8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
  * @param <StateT> The type of checkpoint states.
  */
 public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends 
Serializable>
-    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, 
SeaTunnelContextAware {
+    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, 
SeaTunnelJobAware {
 
     /**
      * Get the boundedness of this source.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index 0a9fa44d7..8842c7595 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -19,13 +19,13 @@ package org.apache.seatunnel.api.transform;
 
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
-import org.apache.seatunnel.api.source.SeaTunnelContextAware;
+import org.apache.seatunnel.api.source.SeaTunnelJobAware;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import java.io.Serializable;
 
 public interface SeaTunnelTransform<T> extends Serializable, 
PluginIdentifierInterface,
-        SeaTunnelPluginLifeCycle, SeaTunnelContextAware {
+        SeaTunnelPluginLifeCycle, SeaTunnelJobAware {
 
     T map(T row);
 
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index dc0c397d6..11c17a1a4 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -41,7 +40,6 @@ import java.util.List;
 @AutoService(SeaTunnelSink.class)
 public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     private static final String RULES = "rules";
-    private SeaTunnelContext seaTunnelContext;
     private SeaTunnelRowType seaTunnelRowType;
     private List<AssertFieldRule> assertFieldRules;
 
@@ -73,11 +71,6 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         assertFieldRules = new AssertRuleParser().parseRules(configList);
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public String getPluginName() {
         return "Assert";
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 295547c74..b3218aaf7 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -29,7 +29,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -67,7 +66,6 @@ import java.util.Properties;
 @AutoService(SeaTunnelSink.class)
 public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, 
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
 
-    private SeaTunnelContext seaTunnelContext;
     private ReaderOption option;
 
     @Override
@@ -165,8 +163,4 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
         return this.option.getSeaTunnelRowType();
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 05c511292..6845af1a8 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -30,7 +30,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -65,7 +64,6 @@ import java.util.stream.Collectors;
 @AutoService(SeaTunnelSink.class)
 public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, 
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
 
-    private SeaTunnelContext seaTunnelContext;
     private FileReaderOption readerOption;
 
     @Override
@@ -137,9 +135,4 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
     public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
         return new ClickhouseFileSinkWriter(readerOption, context);
     }
-
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 65e6587f4..dd1167b9b 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -37,11 +37,11 @@ import com.google.auto.service.AutoService;
 public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
     private Config pluginConfig;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     @Override
     public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
     }
 
     @Override
@@ -67,7 +67,7 @@ public class FakeSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
index 77b72f004..84827b717 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -47,7 +47,7 @@ public abstract class AbstractFileSink implements 
SeaTunnelSink<SeaTunnelRow, Fi
     private String jobId;
     private Long checkpointId;
     private SeaTunnelRowType seaTunnelRowTypeInfo;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
     private TextFileSinkConfig textFileSinkConfig;
     private SinkFileSystemPlugin sinkFileSystemPlugin;
 
@@ -72,7 +72,7 @@ public abstract class AbstractFileSink implements 
SeaTunnelSink<SeaTunnelRow, Fi
 
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
-        if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) && 
this.getSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE)) {
+        if (!jobContext.getJobMode().equals(JobMode.BATCH) && 
this.getSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE)) {
             throw new RuntimeException("only batch job can overwrite mode");
         }
 
@@ -104,9 +104,9 @@ public abstract class AbstractFileSink implements 
SeaTunnelSink<SeaTunnelRow, Fi
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-        this.jobId = seaTunnelContext.getJobId();
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+        this.jobId = jobContext.getJobId();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 4df91b1a5..20ee10423 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -49,7 +49,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, 
HiveSinkState, Hive
     private String jobId;
     private Long checkpointId;
     private SeaTunnelRowType seaTunnelRowTypeInfo;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
     private HiveSinkConfig hiveSinkConfig;
 
     @Override
@@ -76,7 +76,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, 
HiveSinkState, Hive
 
     @Override
     public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
-        if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) && 
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
 {
+        if (!jobContext.getJobMode().equals(JobMode.BATCH) && 
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
 {
             throw new RuntimeException("only batch job can overwrite hive 
table");
         }
 
@@ -96,9 +96,9 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, 
HiveSinkState, Hive
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-        this.jobId = seaTunnelContext.getJobId();
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+        this.jobId = jobContext.getJobId();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index ebaf2c51b..27108620a 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -21,7 +21,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig
 import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -48,8 +47,6 @@ import java.util.List;
 @AutoService(SeaTunnelSource.class)
 public class HiveSource implements SeaTunnelSource<SeaTunnelRow, 
HiveSourceSplit, HiveSourceState> {
 
-    private SeaTunnelContext seaTunnelContext;
-
     private SeaTunnelRowType typeInfo;
 
     private ReadStrategy readStrategy;
@@ -90,11 +87,6 @@ public class HiveSource implements 
SeaTunnelSource<SeaTunnelRow, HiveSourceSplit
         }
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
         return this.typeInfo;
diff --git 
a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 0b906144b..02ea5b264 100644
--- 
a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -24,8 +24,8 @@ import static 
org.apache.seatunnel.connectors.seatunnel.http.config.Config.METHO
 import static 
org.apache.seatunnel.connectors.seatunnel.http.config.Config.PARAMS;
 import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.URL;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -51,7 +51,7 @@ import java.util.stream.Collectors;
 public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private final HttpSourceParameter parameter = new HttpSourceParameter();
     private SeaTunnelRowType rowType;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     @Override
     public String getPluginName() {
@@ -60,7 +60,7 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
     }
 
     @Override
@@ -93,8 +93,8 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
index 2ca69d784..f1cbb619a 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
@@ -25,7 +25,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceCo
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -48,8 +47,6 @@ import java.io.IOException;
 @AutoService(SeaTunnelSource.class)
 public class HudiSource implements SeaTunnelSource<SeaTunnelRow, 
HudiSourceSplit, HudiSourceState> {
 
-    private SeaTunnelContext seaTunnelContext;
-
     private SeaTunnelRowType typeInfo;
 
     private String filePath;
@@ -103,11 +100,6 @@ public class HudiSource implements 
SeaTunnelSource<SeaTunnelRow, HudiSourceSplit
 
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
         return this.typeInfo;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
index 3d2a82b3d..4f51c31a5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 
 import javax.transaction.xa.Xid;
@@ -63,7 +63,7 @@ class SemanticXidGenerator
     }
 
     @Override
-    public Xid generateXid(SeaTunnelContext context, SinkWriter.Context 
sinkContext, long checkpointId) {
+    public Xid generateXid(JobContext context, SinkWriter.Context sinkContext, 
long checkpointId) {
         byte[] jobIdBytes = context.getJobId().getBytes();
         checkArgument(jobIdBytes.length <= JOB_ID_BYTES);
         System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES);
@@ -75,7 +75,7 @@ class SemanticXidGenerator
     }
 
     @Override
-    public boolean belongsToSubtask(Xid xid, SeaTunnelContext context, 
SinkWriter.Context sinkContext) {
+    public boolean belongsToSubtask(Xid xid, JobContext context, 
SinkWriter.Context sinkContext) {
         if (xid.getFormatId() != FORMAT_ID) {
             return false;
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
index e37e6b05a..11ebf0633 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 
@@ -38,6 +38,6 @@ public interface XaGroupOps
 
     GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids);
 
-    void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context 
sinkContext, XidGenerator xidGenerator, Xid excludeXid);
+    void recoverAndRollback(JobContext context, SinkWriter.Context 
sinkContext, XidGenerator xidGenerator, Xid excludeXid);
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
index 05ecce160..ff2012fd7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 
@@ -112,7 +112,7 @@ public class XaGroupOpsImpl
     }
 
     @Override
-    public void recoverAndRollback(SeaTunnelContext context, 
SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
+    public void recoverAndRollback(JobContext context, SinkWriter.Context 
sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
         Collection<Xid> recovered = xaFacade.recover();
         recovered.remove(excludeXid);
         if (recovered.isEmpty()) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
index a80175054..de3ef0059 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 
 import javax.transaction.xa.Xid;
@@ -31,14 +31,14 @@ import java.security.SecureRandom;
 public interface XidGenerator
     extends Serializable, AutoCloseable {
 
-    Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext, 
long checkpointId);
+    Xid generateXid(JobContext context, SinkWriter.Context sinkContext, long 
checkpointId);
 
     default void open() {}
 
     /**
      * @return true if the provided transaction belongs to this subtask
      */
-    boolean belongsToSubtask(Xid xid, SeaTunnelContext context, 
SinkWriter.Context sinkContext);
+    boolean belongsToSubtask(Xid xid, JobContext context, SinkWriter.Context 
sinkContext);
 
     @Override
     default void close() {}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index b4527a9ff..20461db9f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
@@ -53,7 +53,7 @@ public class JdbcExactlyOnceSinkWriter
 
     private final SinkWriter.Context sinkcontext;
 
-    private final SeaTunnelContext context;
+    private final JobContext context;
 
     private final List<JdbcSinkState> recoverStates;
 
@@ -72,7 +72,7 @@ public class JdbcExactlyOnceSinkWriter
 
     public JdbcExactlyOnceSinkWriter(
         SinkWriter.Context sinkcontext,
-        SeaTunnelContext context,
+        JobContext context,
         JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
         JdbcSinkOptions jdbcSinkOptions,
         List<JdbcSinkState> states) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 672303f8c..97a63b980 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -51,7 +51,7 @@ public class JdbcSink
 
     private SeaTunnelRowType seaTunnelRowType;
 
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     private JdbcSinkOptions jdbcSinkOptions;
 
@@ -76,7 +76,7 @@ public class JdbcSink
         if (jdbcSinkOptions.isExactlyOnce()) {
             sinkWriter = new JdbcExactlyOnceSinkWriter(
                 context,
-                seaTunnelContext,
+                jobContext,
                 statementBuilder,
                 jdbcSinkOptions,
                 new ArrayList<>()
@@ -98,7 +98,7 @@ public class JdbcSink
             JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) -> 
JdbcUtils.setRecordToStatement(st, null, row);
             return new JdbcExactlyOnceSinkWriter(
                 context,
-                seaTunnelContext,
+                jobContext,
                 statementBuilder,
                 jdbcSinkOptions,
                 states
@@ -132,8 +132,8 @@ public class JdbcSink
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 2717436f1..a8d0766f3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -57,7 +56,6 @@ import java.util.Map;
 public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, 
JdbcSourceSplit, JdbcSourceState> {
     protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
 
-    private SeaTunnelContext seaTunnelContext;
     private JdbcSourceOptions jdbcSourceOptions;
     private SeaTunnelRowType typeInfo;
 
@@ -96,11 +94,6 @@ public class JdbcSource implements 
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
         );
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index a4d534e67..62314b87a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -23,8 +23,8 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONS
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -53,11 +53,11 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
 
     private final ConsumerMetadata metadata = new ConsumerMetadata();
     private SeaTunnelRowType typeInfo;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     @Override
     public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
     }
 
     @Override
@@ -119,7 +119,7 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index a237679d4..f97e00e93 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.socket.source;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -38,11 +38,11 @@ import com.google.auto.service.AutoService;
 @AutoService(SeaTunnelSource.class)
 public class SocketSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private SocketSourceParameter parameter;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     @Override
     public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
     }
 
     @Override
@@ -56,8 +56,8 @@ public class SocketSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 
     @Override
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 5e3178bfb..1b680b9e8 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.util.TableUtil;
@@ -41,6 +42,7 @@ public abstract class AbstractPluginExecuteProcessor<T> 
implements PluginExecute
 
     protected final FlinkEnvironment flinkEnvironment;
     protected final List<? extends Config> pluginConfigs;
+    protected final JobContext jobContext;
     protected final List<T> plugins;
     protected static final String ENGINE_TYPE = "seatunnel";
     protected static final String PLUGIN_NAME = "plugin_name";
@@ -57,8 +59,10 @@ public abstract class AbstractPluginExecuteProcessor<T> 
implements PluginExecute
     };
 
     protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                             JobContext jobContext,
                                              List<? extends Config> 
pluginConfigs) {
         this.flinkEnvironment = flinkEnvironment;
+        this.jobContext = jobContext;
         this.pluginConfigs = pluginConfigs;
         this.plugins = initializePlugins(pluginConfigs);
     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index c0cc1a67e..ec7d2a199 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.core.starter.config.EngineType;
 import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -50,10 +50,11 @@ public class FlinkExecution implements TaskExecution {
     public FlinkExecution(Config config) {
         this.config = config;
         this.flinkEnvironment = new 
EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
-        
SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
-        this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
-        this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
-        this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(flinkEnvironment.getJobMode());
+        this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(flinkEnvironment, jobContext, 
config.getConfigList("source"));
+        this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(flinkEnvironment, jobContext, 
config.getConfigList("transform"));
+        this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(flinkEnvironment, jobContext, 
config.getConfigList("sink"));
     }
 
     @Override
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 8de5d422d..6a06516d5 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -46,8 +46,9 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
     private static final String PLUGIN_TYPE = "sink";
 
     protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                   JobContext jobContext,
                                    List<? extends Config> pluginConfigs) {
-        super(flinkEnvironment, pluginConfigs);
+        super(flinkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
@@ -60,7 +61,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink =
                 sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
-            seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+            seaTunnelSink.setJobContext(jobContext);
             return seaTunnelSink;
         }).distinct().collect(Collectors.toList());
         flinkEnvironment.registerPlugin(pluginJars);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index a1b31836f..1dc378a83 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -53,8 +53,9 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
     private static final String PLUGIN_TYPE = "source";
 
     public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                  JobContext jobContext,
                                   List<? extends Config> sourceConfigs) {
-        super(flinkEnvironment, sourceConfigs);
+        super(flinkEnvironment, jobContext, sourceConfigs);
     }
 
     @Override
@@ -110,8 +111,8 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
             
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSource seaTunnelSource = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
-            seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
-            if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
+            seaTunnelSource.setJobContext(jobContext);
+            if (jobContext.getJobMode() == JobMode.BATCH
                 && seaTunnelSource.getBoundedness() == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
                 throw new UnsupportedOperationException(String.format("'%s' 
source don't support off-line job.", seaTunnelSource.getPluginName()));
             }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index d077f6d3a..62d65fbcd 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
@@ -39,8 +40,9 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Fl
     private static final String PLUGIN_TYPE = "transform";
 
     protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                        JobContext jobContext,
                                         List<? extends Config> pluginConfigs) {
-        super(flinkEnvironment, pluginConfigs);
+        super(flinkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
index 0dad6f680..af087c0c4 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -34,13 +35,16 @@ public abstract class AbstractPluginExecuteProcessor<T> 
implements PluginExecute
 
     protected final SparkEnvironment sparkEnvironment;
     protected final List<? extends Config> pluginConfigs;
+    protected final JobContext jobContext;
     protected final List<T> plugins;
     protected static final String ENGINE_TYPE = "seatunnel";
     protected static final String PLUGIN_NAME = "plugin_name";
 
     protected AbstractPluginExecuteProcessor(SparkEnvironment sparkEnvironment,
+                                             JobContext jobContext,
                                              List<? extends Config> 
pluginConfigs) {
         this.sparkEnvironment = sparkEnvironment;
+        this.jobContext = jobContext;
         this.pluginConfigs = pluginConfigs;
         this.plugins = initializePlugins(pluginConfigs);
     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 974d0fcf0..b59bbc621 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -43,8 +43,9 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
     private static final String PLUGIN_TYPE = "sink";
 
     protected SinkExecuteProcessor(SparkEnvironment sparkEnvironment,
+                                   JobContext jobContext,
                                    List<? extends Config> pluginConfigs) {
-        super(sparkEnvironment, pluginConfigs);
+        super(sparkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
@@ -56,7 +57,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
             
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
-            seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+            seaTunnelSink.setJobContext(jobContext);
             return seaTunnelSink;
         }).distinct().collect(Collectors.toList());
         sparkEnvironment.registerPlugin(pluginJars);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index f8f8a59b8..b76ea31f2 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -43,8 +43,9 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
     private static final String PLUGIN_TYPE = "source";
 
     public SourceExecuteProcessor(SparkEnvironment sparkEnvironment,
+                                  JobContext jobContext,
                                   List<? extends Config> sourceConfigs) {
-        super(sparkEnvironment, sourceConfigs);
+        super(sparkEnvironment, jobContext, sourceConfigs);
     }
 
     @Override
@@ -74,7 +75,7 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
             
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSource<?, ?, ?> seaTunnelSource = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
-            seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+            seaTunnelSource.setJobContext(jobContext);
             sources.add(seaTunnelSource);
         }
         sparkEnvironment.registerPlugin(new ArrayList<>(jars));
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 7b90bd0dd..0bb57268c 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.core.starter.config.EngineType;
 import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -46,10 +46,11 @@ public class SparkExecution {
     public SparkExecution(Config config) {
         this.config = config;
         this.sparkEnvironment = (SparkEnvironment) new 
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
-        
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
-        this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(sparkEnvironment, config.getConfigList("source"));
-        this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(sparkEnvironment, config.getConfigList("transform"));
-        this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(sparkEnvironment, config.getConfigList("sink"));
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(sparkEnvironment.getJobMode());
+        this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(sparkEnvironment, jobContext, 
config.getConfigList("source"));
+        this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(sparkEnvironment, jobContext, 
config.getConfigList("transform"));
+        this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(sparkEnvironment, jobContext, 
config.getConfigList("sink"));
     }
 
     public void execute() throws TaskExecuteException {
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 5668457b7..66d5e1ee6 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
@@ -39,8 +40,9 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Ba
     private static final String PLUGIN_TYPE = "transform";
 
     protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
+                                        JobContext jobContext,
                                         List<? extends Config> pluginConfigs) {
-        super(sparkEnvironment, pluginConfigs);
+        super(sparkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
index c62ca2871..44f6aa4f5 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.client.job;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -47,7 +47,7 @@ public class ConnectorInstanceLoader {
     }
 
     public static ImmutablePair<SeaTunnelSource, Set<URL>> 
loadSourceInstance(Config sourceConfig,
-                                                                              
SeaTunnelContext seaTunnelContext) {
+                                                                              
JobContext jobContext) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new 
SeaTunnelSourcePluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
@@ -58,8 +58,8 @@ public class ConnectorInstanceLoader {
 
         SeaTunnelSource seaTunnelSource = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
         seaTunnelSource.prepare(sourceConfig);
-        seaTunnelSource.setSeaTunnelContext(seaTunnelContext);
-        if (seaTunnelContext.getJobMode() == JobMode.BATCH
+        seaTunnelSource.setJobContext(jobContext);
+        if (jobContext.getJobMode() == JobMode.BATCH
             && seaTunnelSource.getBoundedness() == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
             throw new UnsupportedOperationException(
                 String.format("'%s' source don't support off-line job.", 
seaTunnelSource.getPluginName()));
@@ -68,7 +68,7 @@ public class ConnectorInstanceLoader {
     }
 
     public static ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, 
Serializable, Serializable>, Set<URL>> loadSinkInstance(
-        Config sinkConfig, SeaTunnelContext seaTunnelContext) {
+        Config sinkConfig, JobContext jobContext) {
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
@@ -79,12 +79,12 @@ public class ConnectorInstanceLoader {
             sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
         seaTunnelSink.prepare(sinkConfig);
         seaTunnelSink.setTypeInfo(null);
-        seaTunnelSink.setSeaTunnelContext(seaTunnelContext);
+        seaTunnelSink.setJobContext(jobContext);
         return new ImmutablePair<>(seaTunnelSink, new 
HashSet<>(pluginJarPaths));
     }
 
     public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>> 
loadTransformInstance(Config transformConfig,
-                                                                               
        SeaTunnelContext seaTunnelContext) {
+                                                                               
        JobContext jobContext) {
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new 
SeaTunnelTransformPluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
@@ -95,7 +95,7 @@ public class ConnectorInstanceLoader {
         SeaTunnelTransform<?> seaTunnelTransform =
             transformPluginDiscovery.createPluginInstance(pluginIdentifier);
         seaTunnelTransform.prepare(transformConfig);
-        seaTunnelTransform.setSeaTunnelContext(seaTunnelContext);
+        seaTunnelTransform.setJobContext(jobContext);
         return new ImmutablePair<>(seaTunnelTransform, new 
HashSet<>(pluginJarPaths));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 964bc5593..608838e9b 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.client.job;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -109,13 +109,13 @@ public class JobConfigParser {
     }
 
     private void jobConfigAnalyze(@NonNull Config envConfigs) {
-        SeaTunnelContext context = SeaTunnelContext.getContext();
+        JobContext jobContext = new JobContext();
         if (envConfigs.hasPath("job.mode")) {
-            context.setJobMode(envConfigs.getEnum(JobMode.class, "job.mode"));
+            jobContext.setJobMode(envConfigs.getEnum(JobMode.class, 
"job.mode"));
         } else {
-            context.setJobMode(JobMode.BATCH);
+            jobContext.setJobMode(JobMode.BATCH);
         }
-        jobConfig.setSeaTunnelContext(context);
+        jobConfig.setJobContext(jobContext);
     }
 
     /**
@@ -133,7 +133,7 @@ public class JobConfigParser {
         for (Config config : sinkConfigs) {
             ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, 
Serializable, Serializable>, Set<URL>>
                 sinkListImmutablePair =
-                ConnectorInstanceLoader.loadSinkInstance(config, 
jobConfig.getSeaTunnelContext());
+                ConnectorInstanceLoader.loadSinkInstance(config, 
jobConfig.getJobContext());
 
             SinkAction sinkAction =
                 createSinkAction(idGenerator.getNextId(), 
sinkListImmutablePair.getLeft().getPluginName(),
@@ -173,7 +173,7 @@ public class JobConfigParser {
         AtomicInteger totalParallelism = new AtomicInteger();
         for (Config sourceConfig : sourceConfigList) {
             ImmutablePair<SeaTunnelSource, Set<URL>> 
seaTunnelSourceListImmutablePair =
-                ConnectorInstanceLoader.loadSourceInstance(sourceConfig, 
jobConfig.getSeaTunnelContext());
+                ConnectorInstanceLoader.loadSourceInstance(sourceConfig, 
jobConfig.getJobContext());
             dataType = 
seaTunnelSourceListImmutablePair.getLeft().getProducedType();
             SourceAction sourceAction = createSourceAction(
                 idGenerator.getNextId(),
@@ -200,7 +200,7 @@ public class JobConfigParser {
             SeaTunnelDataType<?> dataTypeResult = null;
             for (Config config : transformConfigList) {
                 ImmutablePair<SeaTunnelTransform<?>, Set<URL>> 
transformListImmutablePair =
-                    ConnectorInstanceLoader.loadTransformInstance(config, 
jobConfig.getSeaTunnelContext());
+                    ConnectorInstanceLoader.loadTransformInstance(config, 
jobConfig.getJobContext());
                 TransformAction transformAction = createTransformAction(
                     idGenerator.getNextId(),
                     transformListImmutablePair.getLeft().getPluginName(),
@@ -264,20 +264,20 @@ public class JobConfigParser {
                                List<? extends Config> transformConfigs,
                                List<? extends Config> sinkConfigs) {
         ImmutablePair<SeaTunnelSource, Set<URL>> pair =
-            ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0), 
jobConfig.getSeaTunnelContext());
+            ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0), 
jobConfig.getJobContext());
         SourceAction sourceAction =
             createSourceAction(idGenerator.getNextId(), 
pair.getLeft().getPluginName(), pair.getLeft(),
                 pair.getRight());
         
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
         SeaTunnelDataType dataType = 
sourceAction.getSource().getProducedType();
         ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>, Set<URL>>
-            sinkListImmutablePair = 
ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0), 
jobConfig.getSeaTunnelContext());
+            sinkListImmutablePair = 
ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0), 
jobConfig.getJobContext());
 
         Action sinkUpstreamAction = sourceAction;
 
         if (!CollectionUtils.isEmpty(transformConfigs)) {
             ImmutablePair<SeaTunnelTransform<?>, Set<URL>> 
transformListImmutablePair =
-                
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0), 
jobConfig.getSeaTunnelContext());
+                
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0), 
jobConfig.getJobContext());
             transformListImmutablePair.getLeft().setTypeInfo(dataType);
 
             dataType = transformListImmutablePair.getLeft().getProducedType();
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index 2b31a1115..2fb28b14d 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.common.config;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import 
org.apache.seatunnel.engine.common.serializeable.ConfigDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -30,7 +30,7 @@ import java.io.IOException;
 @Data
 public class JobConfig implements IdentifiedDataSerializable {
     private String name;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     @Override
     public int getFactoryId() {
@@ -45,12 +45,12 @@ public class JobConfig implements 
IdentifiedDataSerializable {
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeString(name);
-        out.writeObject(seaTunnelContext);
+        out.writeObject(jobContext);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
         this.name = in.readString();
-        this.seaTunnelContext = in.readObject();
+        this.jobContext = in.readObject();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 518f409c3..dbd2ef76d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.server;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -36,10 +36,10 @@ import java.net.URL;
 public class TestUtils {
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    public static LogicalDag getTestLogicalDag() throws MalformedURLException {
+    public static LogicalDag getTestLogicalDag(JobContext jobContext) throws 
MalformedURLException {
         IdGenerator idGenerator = new IdGenerator();
         FakeSource fakeSource = new FakeSource();
-        fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+        fakeSource.setJobContext(jobContext);
 
         Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", 
fakeSource,
             Sets.newHashSet(new URL("file:///fake.jar")));
@@ -47,7 +47,7 @@ public class TestUtils {
         LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
 
         ConsoleSink consoleSink = new ConsoleSink();
-        consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+        consoleSink.setJobContext(jobContext);
         Action console = new SinkAction<>(idGenerator.getNextId(), "console", 
consoleSink,
             Sets.newHashSet(new URL("file:///console.jar")));
         console.setParallelism(3);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 5fb54aaad..1913f0c84 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -77,16 +77,17 @@ public class CheckpointPlanTest extends 
AbstractSeaTunnelServerTest {
     }
 
     private static void fillVirtualVertex(IdGenerator idGenerator, LogicalDag 
logicalDag, int parallelism) {
-        SeaTunnelContext.getContext().setJobMode(JobMode.BATCH);
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(JobMode.BATCH);
         FakeSource fakeSource = new FakeSource();
-        fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+        fakeSource.setJobContext(jobContext);
 
         Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", 
fakeSource, Collections.emptySet());
         fake.setParallelism(parallelism);
         LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 
parallelism);
 
         ConsoleSink consoleSink = new ConsoleSink();
-        consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+        consoleSink.setJobContext(jobContext);
         Action console = new SinkAction<>(idGenerator.getNextId(), "console", 
consoleSink, Collections.emptySet());
         console.setParallelism(parallelism);
         LogicalVertex consoleVertex = new LogicalVertex(console.getId(), 
console, parallelism);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 125776a29..d9bb880da 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -50,8 +50,9 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
 
     @Test
     public void testTask() throws MalformedURLException {
-        SeaTunnelContext.getContext().setJobMode(JobMode.BATCH);
-        LogicalDag testLogicalDag = TestUtils.getTestLogicalDag();
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(JobMode.BATCH);
+        LogicalDag testLogicalDag = TestUtils.getTestLogicalDag(jobContext);
 
         JobConfig config = new JobConfig();
         config.setName("test");
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 227c347e7..5c98b4bc2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.master;
 
 import static org.awaitility.Awaitility.await;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -52,8 +52,9 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
 
     @Test
     public void testHandleCheckpointTimeout() throws Exception {
-        SeaTunnelContext.getContext().setJobMode(JobMode.STREAMING);
-        LogicalDag testLogicalDag = TestUtils.getTestLogicalDag();
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(JobMode.STREAMING);
+        LogicalDag testLogicalDag = TestUtils.getTestLogicalDag(jobContext);
         JobConfig config = new JobConfig();
         config.setName("test_checkpoint_timeout");
 

Reply via email to