This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cbf82f755 [DEV][Api] Replace SeaTunnelContext with JobContext and 
remove singleton pattern (#2706)
cbf82f755 is described below

commit cbf82f755ca3f905b6b0fad8c2ab2d0d1c5bf36a
Author: ic4y <[email protected]>
AuthorDate: Wed Sep 14 16:36:58 2022 +0800

    [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton 
pattern (#2706)
    
    * remove singleton pattern for SeaTunnelContext
    
    * rename SeaTunnelContext to JobContext and fix checkstyle
    
    * fix checkstyle
    
    * put the constructor on top
---
 .../{SeaTunnelContext.java => JobContext.java}     | 16 +++++--------
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   |  4 ++--
 ...nelContextAware.java => SeaTunnelJobAware.java} |  8 +++----
 .../seatunnel/api/source/SeaTunnelSource.java      |  2 +-
 .../seatunnel/assertion/sink/AssertSink.java       |  7 ------
 .../clickhouse/sink/client/ClickhouseSink.java     |  6 -----
 .../clickhouse/sink/file/ClickhouseFileSink.java   |  7 ------
 .../elasticsearch/client/EsRestClient.java         | 26 ++++++++++++----------
 .../seatunnel/elasticsearch/config/SinkConfig.java |  6 ++---
 .../elasticsearch/constant/BulkConfig.java         |  2 ++
 .../serialize/ElasticsearchRowSerializer.java      | 19 ++++++++--------
 .../serialize/SeaTunnelRowSerializer.java          |  1 -
 .../index/impl/VariableIndexSerializer.java        |  4 ++--
 .../serialize/type/IndexTypeSerializer.java        |  1 -
 .../serialize/type/IndexTypeSerializerFactory.java |  6 +++--
 .../type/impl/NotIndexTypeSerializer.java          |  1 -
 .../elasticsearch/sink/ElasticsearchSink.java      | 16 ++++---------
 .../sink/ElasticsearchSinkWriter.java              |  7 +++---
 .../seatunnel/elasticsearch/util/RegexUtils.java   |  1 -
 .../seatunnel/fake/source/FakeSource.java          | 10 ++++-----
 .../seatunnel/file/sink/AbstractFileSink.java      | 12 +++++-----
 .../seatunnel/file/sink/BaseFileSink.java          | 10 ++++-----
 .../connectors/seatunnel/hive/sink/HiveSink.java   | 12 +++++-----
 .../seatunnel/hive/source/HiveSource.java          |  8 -------
 .../seatunnel/http/source/HttpSource.java          | 10 ++++-----
 .../seatunnel/hudi/source/HudiSource.java          |  8 -------
 .../seatunnel/iotdb/source/IoTDBSource.java        |  4 ++--
 .../jdbc/internal/xa/SemanticXidGenerator.java     |  6 ++---
 .../seatunnel/jdbc/internal/xa/XaGroupOps.java     |  4 ++--
 .../seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java |  4 ++--
 .../seatunnel/jdbc/internal/xa/XidGenerator.java   |  6 ++---
 .../jdbc/sink/JdbcExactlyOnceSinkWriter.java       |  6 ++---
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   | 12 +++++-----
 .../seatunnel/jdbc/source/JdbcSource.java          |  7 ------
 .../seatunnel/kafka/source/KafkaSource.java        | 10 ++++-----
 .../seatunnel/kudu/source/KuduSource.java          |  7 ------
 .../seatunnel/mongodb/source/MongodbSource.java    |  8 -------
 .../seatunnel/redis/source/RedisSource.java        |  7 ------
 .../seatunnel/socket/source/SocketSource.java      | 10 ++++-----
 .../execution/AbstractPluginExecuteProcessor.java  |  4 ++++
 .../starter/flink/execution/FlinkExecution.java    | 11 ++++-----
 .../flink/execution/SinkExecuteProcessor.java      |  7 +++---
 .../flink/execution/SourceExecuteProcessor.java    |  9 ++++----
 .../flink/execution/TransformExecuteProcessor.java |  4 +++-
 .../execution/AbstractPluginExecuteProcessor.java  |  4 ++++
 .../spark/execution/SinkExecuteProcessor.java      |  7 +++---
 .../spark/execution/SourceExecuteProcessor.java    |  7 +++---
 .../starter/spark/execution/SparkExecution.java    | 11 ++++-----
 .../spark/execution/TransformExecuteProcessor.java |  4 +++-
 49 files changed, 156 insertions(+), 213 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
 b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
similarity index 84%
rename from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
index b6152e44f..9e56de36a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
@@ -28,14 +28,12 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * This class is used to store the context of the application. e.g. the table 
schema, catalog...etc.
+ * This class is used to store the context of the job. e.g. the table schema, 
catalog...etc.
  */
-public final class SeaTunnelContext implements Serializable {
+public final class JobContext implements Serializable {
 
     private static final long serialVersionUID = -1L;
 
-    private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
-
     // tableName -> tableSchema
     private final Map<String, TableSchema> tableSchemaMap = new 
ConcurrentHashMap<>(Common.COLLECTION_SIZE);
 
@@ -43,8 +41,8 @@ public final class SeaTunnelContext implements Serializable {
 
     private final String jobId;
 
-    public static SeaTunnelContext getContext() {
-        return INSTANCE;
+    public JobContext() {
+        this.jobId = UUID.randomUUID().toString().replace("-", "");
     }
 
     /**
@@ -67,7 +65,7 @@ public final class SeaTunnelContext implements Serializable {
         return Optional.ofNullable(tableSchemaMap.get(tableName));
     }
 
-    public SeaTunnelContext setJobMode(JobMode jobMode) {
+    public JobContext setJobMode(JobMode jobMode) {
         this.jobMode = jobMode;
         return this;
     }
@@ -80,8 +78,4 @@ public final class SeaTunnelContext implements Serializable {
         return this.jobId;
     }
 
-    private SeaTunnelContext() {
-        this.jobId = UUID.randomUUID().toString().replace("-", "");
-    }
-
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 59517e409..e09d8110a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.sink;
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.source.SeaTunnelContextAware;
+import org.apache.seatunnel.api.source.SeaTunnelJobAware;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
@@ -43,7 +43,7 @@ import java.util.Optional;
  *                                {@link SinkAggregatedCommitter} handle it, 
this class should implement interface {@link Serializable}.
  */
 public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
-    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, 
SeaTunnelContextAware {
+    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, 
SeaTunnelJobAware {
 
     /**
      * Set the row type info of sink row data. This method will be 
automatically called by translation.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
similarity index 83%
rename from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
index 429f05155..8eb90b04f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
@@ -17,14 +17,14 @@
 
 package org.apache.seatunnel.api.source;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 
 /**
- * This interface defines the runtime environment of the SeaTunnel application.
+ * This interface defines the runtime environment of the SeaTunnel job.
  */
-public interface SeaTunnelContextAware {
+public interface SeaTunnelJobAware {
 
-    default void setSeaTunnelContext(SeaTunnelContext seaTunnelContext){
+    default void setJobContext(JobContext jobContext){
         // nothing
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index f93f4d3bf..6edac04a8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
  * @param <StateT> The type of checkpoint states.
  */
 public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends 
Serializable>
-    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, 
SeaTunnelContextAware {
+    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, 
SeaTunnelJobAware {
 
     /**
      * Get the boundedness of this source.
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index dc0c397d6..11c17a1a4 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -41,7 +40,6 @@ import java.util.List;
 @AutoService(SeaTunnelSink.class)
 public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     private static final String RULES = "rules";
-    private SeaTunnelContext seaTunnelContext;
     private SeaTunnelRowType seaTunnelRowType;
     private List<AssertFieldRule> assertFieldRules;
 
@@ -73,11 +71,6 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         assertFieldRules = new AssertRuleParser().parseRules(configList);
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public String getPluginName() {
         return "Assert";
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index c296ca03a..3c2c1104d 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -29,7 +29,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -67,7 +66,6 @@ import java.util.Properties;
 @AutoService(SeaTunnelSink.class)
 public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, 
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
 
-    private SeaTunnelContext seaTunnelContext;
     private ReaderOption option;
 
     @Override
@@ -194,8 +192,4 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
         return this.option.getSeaTunnelRowType();
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 62694c3fe..319937ac1 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -30,7 +30,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -65,7 +64,6 @@ import java.util.stream.Collectors;
 @AutoService(SeaTunnelSink.class)
 public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, 
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
 
-    private SeaTunnelContext seaTunnelContext;
     private FileReaderOption readerOption;
 
     @Override
@@ -140,9 +138,4 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
     public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
         return new ClickhouseFileSinkWriter(readerOption, context);
     }
-
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 5a2a3df09..661d5b7ca 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
 
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
@@ -27,9 +31,6 @@ import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.util.EntityUtils;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
@@ -40,13 +41,14 @@ import java.util.List;
 
 public class EsRestClient {
 
-    private static EsRestClient esRestClient;
-    private static RestClient restClient;
+    private static EsRestClient ES_REST_CLIENT;
+    private static RestClient REST_CLIENT;
 
     private EsRestClient() {
 
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     private static RestClientBuilder getRestClientBuilder(List<String> hosts, 
String username, String password) {
         HttpHost[] httpHosts = new HttpHost[hosts.size()];
         for (int i = 0; i < hosts.size(); i++) {
@@ -68,19 +70,19 @@ public class EsRestClient {
     }
 
     public static EsRestClient getInstance(List<String> hosts, String 
username, String password) {
-        if (restClient == null) {
+        if (REST_CLIENT == null) {
             RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, 
username, password);
-            restClient = restClientBuilder.build();
-            esRestClient = new EsRestClient();
+            REST_CLIENT = restClientBuilder.build();
+            ES_REST_CLIENT = new EsRestClient();
         }
-        return esRestClient;
+        return ES_REST_CLIENT;
     }
 
     public BulkResponse bulk(String requestBody) {
         Request request = new Request("POST", "_bulk");
         request.setJsonEntity(requestBody);
         try {
-            Response response = restClient.performRequest(request);
+            Response response = REST_CLIENT.performRequest(request);
             if (response == null) {
                 throw new BulkElasticsearchException("bulk es Response is 
null");
             }
@@ -105,7 +107,7 @@ public class EsRestClient {
     public static String getClusterVersion() {
         Request request = new Request("GET", "/");
         try {
-            Response response = restClient.performRequest(request);
+            Response response = REST_CLIENT.performRequest(request);
             String result = EntityUtils.toString(response.getEntity());
             ObjectMapper objectMapper = new ObjectMapper();
             JsonNode jsonNode = objectMapper.readTree(result);
@@ -117,7 +119,7 @@ public class EsRestClient {
     }
 
     public void close() throws IOException {
-        restClient.close();
+        REST_CLIENT.close();
     }
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index f747fad85..6dc753bb2 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -35,11 +35,11 @@ public class SinkConfig {
 
     public static final String MAX_RETRY_SIZE = "max_retry_size";
 
-    public static void 
setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig){
-        if(pluginConfig.hasPath(MAX_BATCH_SIZE)){
+    public static void 
setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
+        if (pluginConfig.hasPath(MAX_BATCH_SIZE)) {
             BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
         }
-        if(pluginConfig.hasPath(MAX_RETRY_SIZE)){
+        if (pluginConfig.hasPath(MAX_RETRY_SIZE)) {
             BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
index b6108dc47..dba8b8dd1 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
@@ -27,11 +27,13 @@ public class BulkConfig {
      * once bulk es include max document size
      * {@link SinkConfig#MAX_BATCH_SIZE}
      */
+    @SuppressWarnings("checkstyle:MagicNumber")
     public static int MAX_BATCH_SIZE = 10;
 
     /**
      * the max retry size of bulk es
      * {@link SinkConfig#MAX_RETRY_SIZE}
      */
+    @SuppressWarnings("checkstyle:MagicNumber")
     public static int MAX_RETRY_SIZE = 3;
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index 06c5581bb..0e4c12b9f 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -17,17 +17,18 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializerFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializerFactory;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -43,8 +44,8 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer{
     private final IndexTypeSerializer indexTypeSerializer;
 
     public ElasticsearchRowSerializer(ElasticsearchVersion 
elasticsearchVersion, IndexInfo indexInfo, SeaTunnelRowType seaTunnelRowType) {
-        this.indexTypeSerializer = 
IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion,indexInfo.getType());
-        this.indexSerializer = 
IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(),seaTunnelRowType);
+        this.indexTypeSerializer = 
IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion, 
indexInfo.getType());
+        this.indexSerializer = 
IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(), 
seaTunnelRowType);
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
@@ -59,13 +60,13 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer{
 
         StringBuilder sb = new StringBuilder();
 
-        Map<String,String> indexInner = new HashMap<>();
+        Map<String, String> indexInner = new HashMap<>();
         String index = indexSerializer.serialize(row);
-        indexInner.put("_index",index);
+        indexInner.put("_index", index);
         indexTypeSerializer.fillType(indexInner);
 
-        Map<String, Map<String,String>> indexParam = new HashMap<>();
-        indexParam.put("index",indexInner);
+        Map<String, Map<String, String>> indexParam = new HashMap<>();
+        indexParam.put("index", indexInner);
         try {
             sb.append(objectMapper.writeValueAsString(indexParam));
             sb.append("\n");
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
index d1fbae8a4..53300f984 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
 
-
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 public interface SeaTunnelRowSerializer {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
index 799763235..2ddfc35c5 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
@@ -33,7 +33,7 @@ public class VariableIndexSerializer implements 
IndexSerializer {
     private final String index;
     private final Map<String, Integer> fieldIndexMap;
 
-    private final String NULL_DEFAULT = "null";
+    private final String nullDefault = "null";
 
     public VariableIndexSerializer(SeaTunnelRowType seaTunnelRowType, String 
index, List<String> fieldNames) {
         this.index = index;
@@ -61,7 +61,7 @@ public class VariableIndexSerializer implements 
IndexSerializer {
     private String getValue(int fieldIndex, SeaTunnelRow row) {
         Object valueObj = row.getField(fieldIndex);
         if (valueObj == null) {
-            return NULL_DEFAULT;
+            return nullDefault;
         } else {
             return valueObj.toString();
         }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
index 3e528058e..7d0a395e0 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
 
-
 import java.util.Map;
 
 public interface IndexTypeSerializer {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
index 878257cb6..f70a54dc1 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
@@ -17,12 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.ES2;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.ES5;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.ES6;
+
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.NotIndexTypeSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.RequiredIndexTypeSerializer;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.*;
-
 public class IndexTypeSerializerFactory {
 
     private static final String DEFAULT_TYPE = "st";
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
index fa5afb5b8..57bfe8116 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
@@ -26,7 +26,6 @@ import java.util.Map;
  */
 public class NotIndexTypeSerializer implements IndexTypeSerializer {
 
-
     @Override
     public void fillType(Map<String, String> indexInner) {
 
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index a5eac83ac..e0f7630cf 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -17,9 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
-import com.google.auto.service.AutoService;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -30,25 +28,25 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.Elasticsear
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
 
-import java.util.Collections;
+import com.google.auto.service.AutoService;
 
+import java.util.Collections;
 
 @AutoService(SeaTunnelSink.class)
 public class ElasticsearchSink implements SeaTunnelSink<SeaTunnelRow, 
ElasticsearchSinkState, ElasticsearchCommitInfo, 
ElasticsearchAggregatedCommitInfo> {
 
 
     private org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig;
-    private SeaTunnelContext seaTunnelContext;
     private SeaTunnelRowType seaTunnelRowType;
 
-
     @Override
     public String getPluginName() {
         return "Elasticsearch";
     }
 
     @Override
-    public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config 
pluginConfig) throws PrepareFailException {
+    public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config 
pluginConfig) throws
+        PrepareFailException {
         this.pluginConfig = pluginConfig;
         SinkConfig.setValue(pluginConfig);
     }
@@ -67,10 +65,4 @@ public class ElasticsearchSink implements 
SeaTunnelSink<SeaTunnelRow, Elasticsea
     public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, 
ElasticsearchSinkState> createWriter(SinkWriter.Context context) {
         return new ElasticsearchSinkWriter(context, seaTunnelRowType, 
pluginConfig, Collections.emptyList());
     }
-
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 048f48f44..d527d90d5 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -30,7 +30,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkEla
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
+
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +44,7 @@ import java.util.Optional;
 /**
  * ElasticsearchSinkWriter is a sink writer that will write {@link 
SeaTunnelRow} to Elasticsearch.
  */
-public class ElasticsearchSinkWriter<ElasticsearchSinkState> implements 
SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> {
+public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements 
SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkStateT> {
 
     private final Context context;
 
@@ -53,12 +55,11 @@ public class 
ElasticsearchSinkWriter<ElasticsearchSinkState> implements SinkWrit
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
 
-
     public ElasticsearchSinkWriter(
             Context context,
             SeaTunnelRowType seaTunnelRowType,
             Config pluginConfig,
-            List<ElasticsearchSinkState> elasticsearchStates) {
+            List<ElasticsearchSinkStateT> elasticsearchStates) {
         this.context = context;
 
         IndexInfo indexInfo = new IndexInfo(pluginConfig);
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
index 9ccc413ff..097eca897 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
@@ -20,7 +20,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.util;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Matcher;
-
 import java.util.regex.Pattern;
 
 public class RegexUtils {
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 0200362d0..fd9387f2d 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -36,12 +36,12 @@ import com.google.auto.service.AutoService;
 public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
     private Config pluginConfig;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
     private SeaTunnelSchema schema;
 
     @Override
     public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
     }
 
     @Override
@@ -67,7 +67,7 @@ public class FakeSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
index 77b72f004..84827b717 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -47,7 +47,7 @@ public abstract class AbstractFileSink implements 
SeaTunnelSink<SeaTunnelRow, Fi
     private String jobId;
     private Long checkpointId;
     private SeaTunnelRowType seaTunnelRowTypeInfo;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
     private TextFileSinkConfig textFileSinkConfig;
     private SinkFileSystemPlugin sinkFileSystemPlugin;
 
@@ -72,7 +72,7 @@ public abstract class AbstractFileSink implements 
SeaTunnelSink<SeaTunnelRow, Fi
 
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
-        if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) && 
this.getSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE)) {
+        if (!jobContext.getJobMode().equals(JobMode.BATCH) && 
this.getSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE)) {
             throw new RuntimeException("only batch job can overwrite mode");
         }
 
@@ -104,9 +104,9 @@ public abstract class AbstractFileSink implements 
SeaTunnelSink<SeaTunnelRow, Fi
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-        this.jobId = seaTunnelContext.getJobId();
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+        this.jobId = jobContext.getJobId();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index e5bc67ffa..0e5187e5b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -48,13 +48,13 @@ public abstract class BaseFileSink implements 
SeaTunnelSink<SeaTunnelRow, FileSi
     protected HadoopConf hadoopConf;
     protected TextFileSinkConfig textFileSinkConfig;
     protected WriteStrategy writeStrategy;
-    protected SeaTunnelContext seaTunnelContext;
+    protected JobContext jobContext;
     protected String jobId;
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-        this.jobId = seaTunnelContext.getJobId();
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+        this.jobId = jobContext.getJobId();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index bcdca3d49..36bd50ca3 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -49,7 +49,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, 
HiveSinkState, Hive
     private String jobId;
     private Long checkpointId;
     private SeaTunnelRowType seaTunnelRowTypeInfo;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
     private HiveSinkConfig hiveSinkConfig;
 
     @Override
@@ -76,7 +76,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, 
HiveSinkState, Hive
 
     @Override
     public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
-        if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) && 
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
 {
+        if (!jobContext.getJobMode().equals(JobMode.BATCH) && 
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
 {
             throw new RuntimeException("only batch job can overwrite hive 
table");
         }
 
@@ -96,9 +96,9 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, 
HiveSinkState, Hive
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-        this.jobId = seaTunnelContext.getJobId();
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+        this.jobId = jobContext.getJobId();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 039d9b4c7..1f1a8c3fb 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -21,7 +21,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig
 import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -47,8 +46,6 @@ import java.util.List;
 @AutoService(SeaTunnelSource.class)
 public class HiveSource implements SeaTunnelSource<SeaTunnelRow, 
HiveSourceSplit, HiveSourceState> {
 
-    private SeaTunnelContext seaTunnelContext;
-
     private SeaTunnelRowType typeInfo;
 
     private ReadStrategy readStrategy;
@@ -85,11 +82,6 @@ public class HiveSource implements 
SeaTunnelSource<SeaTunnelRow, HiveSourceSplit
         }
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
         return this.typeInfo;
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index cd52b19a6..dff630b12 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.source;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -45,7 +45,7 @@ import com.google.auto.service.AutoService;
 public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     protected final HttpParameter httpParameter = new HttpParameter();
     protected SeaTunnelRowType rowType;
-    protected SeaTunnelContext seaTunnelContext;
+    protected JobContext jobContext;
     protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
     @Override
@@ -55,7 +55,7 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
     }
 
     @Override
@@ -84,8 +84,8 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
index 2ca69d784..f1cbb619a 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
@@ -25,7 +25,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceCo
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -48,8 +47,6 @@ import java.io.IOException;
 @AutoService(SeaTunnelSource.class)
 public class HudiSource implements SeaTunnelSource<SeaTunnelRow, 
HudiSourceSplit, HudiSourceState> {
 
-    private SeaTunnelContext seaTunnelContext;
-
     private SeaTunnelRowType typeInfo;
 
     private String filePath;
@@ -103,11 +100,6 @@ public class HudiSource implements 
SeaTunnelSource<SeaTunnelRow, HudiSourceSplit
 
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
         return this.typeInfo;
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
index a7c052592..668d2861d 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
@@ -21,8 +21,8 @@ import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfi
 import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NODE_URLS;
 import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -46,7 +46,7 @@ import java.util.Map;
 @AutoService(SeaTunnelSource.class)
 public class IoTDBSource implements SeaTunnelSource<SeaTunnelRow, 
IoTDBSourceSplit, IoTDBSourceState> {
 
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     private SeaTunnelRowType typeInfo;
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
index 3d2a82b3d..4f51c31a5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 
 import javax.transaction.xa.Xid;
@@ -63,7 +63,7 @@ class SemanticXidGenerator
     }
 
     @Override
-    public Xid generateXid(SeaTunnelContext context, SinkWriter.Context 
sinkContext, long checkpointId) {
+    public Xid generateXid(JobContext context, SinkWriter.Context sinkContext, 
long checkpointId) {
         byte[] jobIdBytes = context.getJobId().getBytes();
         checkArgument(jobIdBytes.length <= JOB_ID_BYTES);
         System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES);
@@ -75,7 +75,7 @@ class SemanticXidGenerator
     }
 
     @Override
-    public boolean belongsToSubtask(Xid xid, SeaTunnelContext context, 
SinkWriter.Context sinkContext) {
+    public boolean belongsToSubtask(Xid xid, JobContext context, 
SinkWriter.Context sinkContext) {
         if (xid.getFormatId() != FORMAT_ID) {
             return false;
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
index e37e6b05a..11ebf0633 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 
@@ -38,6 +38,6 @@ public interface XaGroupOps
 
     GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids);
 
-    void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context 
sinkContext, XidGenerator xidGenerator, Xid excludeXid);
+    void recoverAndRollback(JobContext context, SinkWriter.Context 
sinkContext, XidGenerator xidGenerator, Xid excludeXid);
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
index 05ecce160..ff2012fd7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 
@@ -112,7 +112,7 @@ public class XaGroupOpsImpl
     }
 
     @Override
-    public void recoverAndRollback(SeaTunnelContext context, 
SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
+    public void recoverAndRollback(JobContext context, SinkWriter.Context 
sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
         Collection<Xid> recovered = xaFacade.recover();
         recovered.remove(excludeXid);
         if (recovered.isEmpty()) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
index a80175054..de3ef0059 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 
 import javax.transaction.xa.Xid;
@@ -31,14 +31,14 @@ import java.security.SecureRandom;
 public interface XidGenerator
     extends Serializable, AutoCloseable {
 
-    Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext, 
long checkpointId);
+    Xid generateXid(JobContext context, SinkWriter.Context sinkContext, long 
checkpointId);
 
     default void open() {}
 
     /**
      * @return true if the provided transaction belongs to this subtask
      */
-    boolean belongsToSubtask(Xid xid, SeaTunnelContext context, 
SinkWriter.Context sinkContext);
+    boolean belongsToSubtask(Xid xid, JobContext context, SinkWriter.Context 
sinkContext);
 
     @Override
     default void close() {}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index b4527a9ff..20461db9f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
@@ -53,7 +53,7 @@ public class JdbcExactlyOnceSinkWriter
 
     private final SinkWriter.Context sinkcontext;
 
-    private final SeaTunnelContext context;
+    private final JobContext context;
 
     private final List<JdbcSinkState> recoverStates;
 
@@ -72,7 +72,7 @@ public class JdbcExactlyOnceSinkWriter
 
     public JdbcExactlyOnceSinkWriter(
         SinkWriter.Context sinkcontext,
-        SeaTunnelContext context,
+        JobContext context,
         JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
         JdbcSinkOptions jdbcSinkOptions,
         List<JdbcSinkState> states) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 672303f8c..97a63b980 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -51,7 +51,7 @@ public class JdbcSink
 
     private SeaTunnelRowType seaTunnelRowType;
 
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     private JdbcSinkOptions jdbcSinkOptions;
 
@@ -76,7 +76,7 @@ public class JdbcSink
         if (jdbcSinkOptions.isExactlyOnce()) {
             sinkWriter = new JdbcExactlyOnceSinkWriter(
                 context,
-                seaTunnelContext,
+                jobContext,
                 statementBuilder,
                 jdbcSinkOptions,
                 new ArrayList<>()
@@ -98,7 +98,7 @@ public class JdbcSink
             JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) -> 
JdbcUtils.setRecordToStatement(st, null, row);
             return new JdbcExactlyOnceSinkWriter(
                 context,
-                seaTunnelContext,
+                jobContext,
                 statementBuilder,
                 jdbcSinkOptions,
                 states
@@ -132,8 +132,8 @@ public class JdbcSink
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 2717436f1..a8d0766f3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -57,7 +56,6 @@ import java.util.Map;
 public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, 
JdbcSourceSplit, JdbcSourceState> {
     protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
 
-    private SeaTunnelContext seaTunnelContext;
     private JdbcSourceOptions jdbcSourceOptions;
     private SeaTunnelRowType typeInfo;
 
@@ -96,11 +94,6 @@ public class JdbcSource implements 
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
         );
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index a4d534e67..62314b87a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -23,8 +23,8 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONS
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -53,11 +53,11 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
 
     private final ConsumerMetadata metadata = new ConsumerMetadata();
     private SeaTunnelRowType typeInfo;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     @Override
     public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
     }
 
     @Override
@@ -119,7 +119,7 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index 65b004a6a..c10796a21 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.kudu.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.Boundedness;
@@ -53,7 +52,6 @@ import java.util.List;
 public class KuduSource implements SeaTunnelSource<SeaTunnelRow, 
KuduSourceSplit, KuduSourceState> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KuduSource.class);
 
-    private SeaTunnelContext seaTunnelContext;
     private SeaTunnelRowType rowTypeInfo;
     private KuduInputFormat kuduInputFormat;
     private PartitionParameter partitionParameter;
@@ -167,11 +165,6 @@ public class KuduSource implements 
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
         return new PartitionParameter(keyColumn, Long.parseLong(minKey + ""), 
Long.parseLong(maxKey + ""));
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> 
columnSchemaList) {
         ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
         ArrayList<String> fieldNames = new ArrayList<>();
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
index 077097569..45c79048b 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -25,7 +25,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbCo
 import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -50,8 +49,6 @@ import com.google.auto.service.AutoService;
 @AutoService(SeaTunnelSource.class)
 public class MongodbSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
-    private SeaTunnelContext seaTunnelContext;
-
     private SeaTunnelRowType rowType;
 
     private MongodbParameters params;
@@ -91,11 +88,6 @@ public class MongodbSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
         }
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index d860e6af5..407d32222 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.redis.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -43,7 +42,6 @@ import com.google.auto.service.AutoService;
 @AutoService(SeaTunnelSource.class)
 public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private final RedisParameters redisParameters = new RedisParameters();
-    private SeaTunnelContext seaTunnelContext;
     private SeaTunnelRowType seaTunnelRowType;
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
@@ -87,11 +85,6 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
         return seaTunnelRowType;
     }
 
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
         return new RedisSourceReader(redisParameters, readerContext, 
deserializationSchema);
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index a237679d4..f97e00e93 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.socket.source;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -38,11 +38,11 @@ import com.google.auto.service.AutoService;
 @AutoService(SeaTunnelSource.class)
 public class SocketSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private SocketSourceParameter parameter;
-    private SeaTunnelContext seaTunnelContext;
+    private JobContext jobContext;
 
     @Override
     public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode()) ? 
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
     }
 
     @Override
@@ -56,8 +56,8 @@ public class SocketSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
     }
 
     @Override
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 5e3178bfb..1b680b9e8 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.util.TableUtil;
@@ -41,6 +42,7 @@ public abstract class AbstractPluginExecuteProcessor<T> 
implements PluginExecute
 
     protected final FlinkEnvironment flinkEnvironment;
     protected final List<? extends Config> pluginConfigs;
+    protected final JobContext jobContext;
     protected final List<T> plugins;
     protected static final String ENGINE_TYPE = "seatunnel";
     protected static final String PLUGIN_NAME = "plugin_name";
@@ -57,8 +59,10 @@ public abstract class AbstractPluginExecuteProcessor<T> 
implements PluginExecute
     };
 
     protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                             JobContext jobContext,
                                              List<? extends Config> 
pluginConfigs) {
         this.flinkEnvironment = flinkEnvironment;
+        this.jobContext = jobContext;
         this.pluginConfigs = pluginConfigs;
         this.plugins = initializePlugins(pluginConfigs);
     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index df452dd85..e6f5719fa 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.config.EngineType;
 import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
@@ -55,10 +55,11 @@ public class FlinkExecution implements TaskExecution {
     public FlinkExecution(Config config) {
         this.config = config;
         this.flinkEnvironment = new 
EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
-        
SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
-        this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
-        this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
-        this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(flinkEnvironment.getJobMode());
+        this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(flinkEnvironment, jobContext, 
config.getConfigList("source"));
+        this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(flinkEnvironment, jobContext, 
config.getConfigList("transform"));
+        this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(flinkEnvironment, jobContext, 
config.getConfigList("sink"));
         registerPlugin();
     }
 
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 8de5d422d..6a06516d5 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -46,8 +46,9 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
     private static final String PLUGIN_TYPE = "sink";
 
     protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                   JobContext jobContext,
                                    List<? extends Config> pluginConfigs) {
-        super(flinkEnvironment, pluginConfigs);
+        super(flinkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
@@ -60,7 +61,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink =
                 sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
-            seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+            seaTunnelSink.setJobContext(jobContext);
             return seaTunnelSink;
         }).distinct().collect(Collectors.toList());
         flinkEnvironment.registerPlugin(pluginJars);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index a1b31836f..1dc378a83 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -53,8 +53,9 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
     private static final String PLUGIN_TYPE = "source";
 
     public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                  JobContext jobContext,
                                   List<? extends Config> sourceConfigs) {
-        super(flinkEnvironment, sourceConfigs);
+        super(flinkEnvironment, jobContext, sourceConfigs);
     }
 
     @Override
@@ -110,8 +111,8 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
             
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSource seaTunnelSource = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
-            seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
-            if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
+            seaTunnelSource.setJobContext(jobContext);
+            if (jobContext.getJobMode() == JobMode.BATCH
                 && seaTunnelSource.getBoundedness() == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
                 throw new UnsupportedOperationException(String.format("'%s' 
source don't support off-line job.", seaTunnelSource.getPluginName()));
             }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index d077f6d3a..62d65fbcd 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
@@ -39,8 +40,9 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Fl
     private static final String PLUGIN_TYPE = "transform";
 
     protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                        JobContext jobContext,
                                         List<? extends Config> pluginConfigs) {
-        super(flinkEnvironment, pluginConfigs);
+        super(flinkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
index 0dad6f680..af087c0c4 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -34,13 +35,16 @@ public abstract class AbstractPluginExecuteProcessor<T> 
implements PluginExecute
 
     protected final SparkEnvironment sparkEnvironment;
     protected final List<? extends Config> pluginConfigs;
+    protected final JobContext jobContext;
     protected final List<T> plugins;
     protected static final String ENGINE_TYPE = "seatunnel";
     protected static final String PLUGIN_NAME = "plugin_name";
 
     protected AbstractPluginExecuteProcessor(SparkEnvironment sparkEnvironment,
+                                             JobContext jobContext,
                                              List<? extends Config> 
pluginConfigs) {
         this.sparkEnvironment = sparkEnvironment;
+        this.jobContext = jobContext;
         this.pluginConfigs = pluginConfigs;
         this.plugins = initializePlugins(pluginConfigs);
     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 45ab01012..fbcecd7c8 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -43,8 +43,9 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
     private static final String PLUGIN_TYPE = "sink";
 
     protected SinkExecuteProcessor(SparkEnvironment sparkEnvironment,
+                                   JobContext jobContext,
                                    List<? extends Config> pluginConfigs) {
-        super(sparkEnvironment, pluginConfigs);
+        super(sparkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
@@ -56,7 +57,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
             
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
-            seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+            seaTunnelSink.setJobContext(jobContext);
             return seaTunnelSink;
         }).distinct().collect(Collectors.toList());
         sparkEnvironment.registerPlugin(pluginJars);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 8dfcb4ef4..f6eafe67c 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -44,8 +44,9 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
     private static final String PLUGIN_TYPE = "source";
 
     public SourceExecuteProcessor(SparkEnvironment sparkEnvironment,
+                                  JobContext jobContext,
                                   List<? extends Config> sourceConfigs) {
-        super(sparkEnvironment, sourceConfigs);
+        super(sparkEnvironment, jobContext, sourceConfigs);
     }
 
     @Override
@@ -75,7 +76,7 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
             
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSource<?, ?, ?> seaTunnelSource = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
-            seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+            seaTunnelSource.setJobContext(jobContext);
             sources.add(seaTunnelSource);
         }
         sparkEnvironment.registerPlugin(new ArrayList<>(jars));
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 31193e0cf..c1d0c9c22 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.core.starter.config.EngineType;
 import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
@@ -47,10 +47,11 @@ public class SparkExecution {
     public SparkExecution(Config config) {
         this.config = config;
         this.sparkEnvironment = (SparkEnvironment) new 
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
-        
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
-        this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(sparkEnvironment, 
config.getConfigList(Constants.SOURCE));
-        this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(sparkEnvironment, 
config.getConfigList(Constants.TRANSFORM));
-        this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(sparkEnvironment, config.getConfigList(Constants.SINK));
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(sparkEnvironment.getJobMode());
+        this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(sparkEnvironment, jobContext, 
config.getConfigList(Constants.SOURCE));
+        this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(sparkEnvironment, jobContext, 
config.getConfigList(Constants.TRANSFORM));
+        this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(sparkEnvironment, jobContext, 
config.getConfigList(Constants.SINK));
     }
 
     public void execute() throws TaskExecuteException {
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 5668457b7..66d5e1ee6 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
@@ -39,8 +40,9 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Ba
     private static final String PLUGIN_TYPE = "transform";
 
     protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
+                                        JobContext jobContext,
                                         List<? extends Config> pluginConfigs) {
-        super(sparkEnvironment, pluginConfigs);
+        super(sparkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override

Reply via email to