This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cbf82f755 [DEV][Api] Replace SeaTunnelContext with JobContext and
remove singleton pattern (#2706)
cbf82f755 is described below
commit cbf82f755ca3f905b6b0fad8c2ab2d0d1c5bf36a
Author: ic4y <[email protected]>
AuthorDate: Wed Sep 14 16:36:58 2022 +0800
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton
pattern (#2706)
* remove singleton pattern for SeaTunnelContext
* rename SeaTunnelContext to JobContext and fix checkstyle
* fix checkstyle
* put the constructor on top
---
.../{SeaTunnelContext.java => JobContext.java} | 16 +++++--------
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 4 ++--
...nelContextAware.java => SeaTunnelJobAware.java} | 8 +++----
.../seatunnel/api/source/SeaTunnelSource.java | 2 +-
.../seatunnel/assertion/sink/AssertSink.java | 7 ------
.../clickhouse/sink/client/ClickhouseSink.java | 6 -----
.../clickhouse/sink/file/ClickhouseFileSink.java | 7 ------
.../elasticsearch/client/EsRestClient.java | 26 ++++++++++++----------
.../seatunnel/elasticsearch/config/SinkConfig.java | 6 ++---
.../elasticsearch/constant/BulkConfig.java | 2 ++
.../serialize/ElasticsearchRowSerializer.java | 19 ++++++++--------
.../serialize/SeaTunnelRowSerializer.java | 1 -
.../index/impl/VariableIndexSerializer.java | 4 ++--
.../serialize/type/IndexTypeSerializer.java | 1 -
.../serialize/type/IndexTypeSerializerFactory.java | 6 +++--
.../type/impl/NotIndexTypeSerializer.java | 1 -
.../elasticsearch/sink/ElasticsearchSink.java | 16 ++++---------
.../sink/ElasticsearchSinkWriter.java | 7 +++---
.../seatunnel/elasticsearch/util/RegexUtils.java | 1 -
.../seatunnel/fake/source/FakeSource.java | 10 ++++-----
.../seatunnel/file/sink/AbstractFileSink.java | 12 +++++-----
.../seatunnel/file/sink/BaseFileSink.java | 10 ++++-----
.../connectors/seatunnel/hive/sink/HiveSink.java | 12 +++++-----
.../seatunnel/hive/source/HiveSource.java | 8 -------
.../seatunnel/http/source/HttpSource.java | 10 ++++-----
.../seatunnel/hudi/source/HudiSource.java | 8 -------
.../seatunnel/iotdb/source/IoTDBSource.java | 4 ++--
.../jdbc/internal/xa/SemanticXidGenerator.java | 6 ++---
.../seatunnel/jdbc/internal/xa/XaGroupOps.java | 4 ++--
.../seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java | 4 ++--
.../seatunnel/jdbc/internal/xa/XidGenerator.java | 6 ++---
.../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 6 ++---
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 12 +++++-----
.../seatunnel/jdbc/source/JdbcSource.java | 7 ------
.../seatunnel/kafka/source/KafkaSource.java | 10 ++++-----
.../seatunnel/kudu/source/KuduSource.java | 7 ------
.../seatunnel/mongodb/source/MongodbSource.java | 8 -------
.../seatunnel/redis/source/RedisSource.java | 7 ------
.../seatunnel/socket/source/SocketSource.java | 10 ++++-----
.../execution/AbstractPluginExecuteProcessor.java | 4 ++++
.../starter/flink/execution/FlinkExecution.java | 11 ++++-----
.../flink/execution/SinkExecuteProcessor.java | 7 +++---
.../flink/execution/SourceExecuteProcessor.java | 9 ++++----
.../flink/execution/TransformExecuteProcessor.java | 4 +++-
.../execution/AbstractPluginExecuteProcessor.java | 4 ++++
.../spark/execution/SinkExecuteProcessor.java | 7 +++---
.../spark/execution/SourceExecuteProcessor.java | 7 +++---
.../starter/spark/execution/SparkExecution.java | 11 ++++-----
.../spark/execution/TransformExecuteProcessor.java | 4 +++-
49 files changed, 156 insertions(+), 213 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
similarity index 84%
rename from
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
index b6152e44f..9e56de36a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
@@ -28,14 +28,12 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
- * This class is used to store the context of the application. e.g. the table
schema, catalog...etc.
+ * This class is used to store the context of the job. e.g. the table schema,
catalog...etc.
*/
-public final class SeaTunnelContext implements Serializable {
+public final class JobContext implements Serializable {
private static final long serialVersionUID = -1L;
- private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
-
// tableName -> tableSchema
private final Map<String, TableSchema> tableSchemaMap = new
ConcurrentHashMap<>(Common.COLLECTION_SIZE);
@@ -43,8 +41,8 @@ public final class SeaTunnelContext implements Serializable {
private final String jobId;
- public static SeaTunnelContext getContext() {
- return INSTANCE;
+ public JobContext() {
+ this.jobId = UUID.randomUUID().toString().replace("-", "");
}
/**
@@ -67,7 +65,7 @@ public final class SeaTunnelContext implements Serializable {
return Optional.ofNullable(tableSchemaMap.get(tableName));
}
- public SeaTunnelContext setJobMode(JobMode jobMode) {
+ public JobContext setJobMode(JobMode jobMode) {
this.jobMode = jobMode;
return this;
}
@@ -80,8 +78,4 @@ public final class SeaTunnelContext implements Serializable {
return this.jobId;
}
- private SeaTunnelContext() {
- this.jobId = UUID.randomUUID().toString().replace("-", "");
- }
-
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 59517e409..e09d8110a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.sink;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.source.SeaTunnelContextAware;
+import org.apache.seatunnel.api.source.SeaTunnelJobAware;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -43,7 +43,7 @@ import java.util.Optional;
* {@link SinkAggregatedCommitter} handle it,
this class should implement interface {@link Serializable}.
*/
public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
- extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle,
SeaTunnelContextAware {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle,
SeaTunnelJobAware {
/**
* Set the row type info of sink row data. This method will be
automatically called by translation.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
similarity index 83%
rename from
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
index 429f05155..8eb90b04f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
@@ -17,14 +17,14 @@
package org.apache.seatunnel.api.source;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
/**
- * This interface defines the runtime environment of the SeaTunnel application.
+ * This interface defines the runtime environment of the SeaTunnel job.
*/
-public interface SeaTunnelContextAware {
+public interface SeaTunnelJobAware {
- default void setSeaTunnelContext(SeaTunnelContext seaTunnelContext){
+ default void setJobContext(JobContext jobContext){
// nothing
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index f93f4d3bf..6edac04a8 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
* @param <StateT> The type of checkpoint states.
*/
public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends
Serializable>
- extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle,
SeaTunnelContextAware {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle,
SeaTunnelJobAware {
/**
* Get the boundedness of this source.
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index dc0c397d6..11c17a1a4 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -41,7 +40,6 @@ import java.util.List;
@AutoService(SeaTunnelSink.class)
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private static final String RULES = "rules";
- private SeaTunnelContext seaTunnelContext;
private SeaTunnelRowType seaTunnelRowType;
private List<AssertFieldRule> assertFieldRules;
@@ -73,11 +71,6 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
assertFieldRules = new AssertRuleParser().parseRules(configList);
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public String getPluginName() {
return "Assert";
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index c296ca03a..3c2c1104d 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -29,7 +29,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -67,7 +66,6 @@ import java.util.Properties;
@AutoService(SeaTunnelSink.class)
public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow,
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
- private SeaTunnelContext seaTunnelContext;
private ReaderOption option;
@Override
@@ -194,8 +192,4 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
return this.option.getSeaTunnelRowType();
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 62694c3fe..319937ac1 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -30,7 +30,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -65,7 +64,6 @@ import java.util.stream.Collectors;
@AutoService(SeaTunnelSink.class)
public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow,
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
- private SeaTunnelContext seaTunnelContext;
private FileReaderOption readerOption;
@Override
@@ -140,9 +138,4 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState>
createWriter(SinkWriter.Context context) throws IOException {
return new ClickhouseFileSinkWriter(readerOption, context);
}
-
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 5a2a3df09..661d5b7ca 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
@@ -27,9 +31,6 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
@@ -40,13 +41,14 @@ import java.util.List;
public class EsRestClient {
- private static EsRestClient esRestClient;
- private static RestClient restClient;
+ private static EsRestClient ES_REST_CLIENT;
+ private static RestClient REST_CLIENT;
private EsRestClient() {
}
+ @SuppressWarnings("checkstyle:MagicNumber")
private static RestClientBuilder getRestClientBuilder(List<String> hosts,
String username, String password) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
@@ -68,19 +70,19 @@ public class EsRestClient {
}
public static EsRestClient getInstance(List<String> hosts, String
username, String password) {
- if (restClient == null) {
+ if (REST_CLIENT == null) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts,
username, password);
- restClient = restClientBuilder.build();
- esRestClient = new EsRestClient();
+ REST_CLIENT = restClientBuilder.build();
+ ES_REST_CLIENT = new EsRestClient();
}
- return esRestClient;
+ return ES_REST_CLIENT;
}
public BulkResponse bulk(String requestBody) {
Request request = new Request("POST", "_bulk");
request.setJsonEntity(requestBody);
try {
- Response response = restClient.performRequest(request);
+ Response response = REST_CLIENT.performRequest(request);
if (response == null) {
throw new BulkElasticsearchException("bulk es Response is
null");
}
@@ -105,7 +107,7 @@ public class EsRestClient {
public static String getClusterVersion() {
Request request = new Request("GET", "/");
try {
- Response response = restClient.performRequest(request);
+ Response response = REST_CLIENT.performRequest(request);
String result = EntityUtils.toString(response.getEntity());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result);
@@ -117,7 +119,7 @@ public class EsRestClient {
}
public void close() throws IOException {
- restClient.close();
+ REST_CLIENT.close();
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index f747fad85..6dc753bb2 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -35,11 +35,11 @@ public class SinkConfig {
public static final String MAX_RETRY_SIZE = "max_retry_size";
- public static void
setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig){
- if(pluginConfig.hasPath(MAX_BATCH_SIZE)){
+ public static void
setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
+ if (pluginConfig.hasPath(MAX_BATCH_SIZE)) {
BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
}
- if(pluginConfig.hasPath(MAX_RETRY_SIZE)){
+ if (pluginConfig.hasPath(MAX_RETRY_SIZE)) {
BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
index b6108dc47..dba8b8dd1 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
@@ -27,11 +27,13 @@ public class BulkConfig {
* once bulk es include max document size
* {@link SinkConfig#MAX_BATCH_SIZE}
*/
+ @SuppressWarnings("checkstyle:MagicNumber")
public static int MAX_BATCH_SIZE = 10;
/**
* the max retry size of bulk es
* {@link SinkConfig#MAX_RETRY_SIZE}
*/
+ @SuppressWarnings("checkstyle:MagicNumber")
public static int MAX_RETRY_SIZE = 3;
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index 06c5581bb..0e4c12b9f 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -17,17 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializerFactory;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializerFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.util.HashMap;
import java.util.Map;
@@ -43,8 +44,8 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer{
private final IndexTypeSerializer indexTypeSerializer;
public ElasticsearchRowSerializer(ElasticsearchVersion
elasticsearchVersion, IndexInfo indexInfo, SeaTunnelRowType seaTunnelRowType) {
- this.indexTypeSerializer =
IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion,indexInfo.getType());
- this.indexSerializer =
IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(),seaTunnelRowType);
+ this.indexTypeSerializer =
IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion,
indexInfo.getType());
+ this.indexSerializer =
IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(),
seaTunnelRowType);
this.seaTunnelRowType = seaTunnelRowType;
}
@@ -59,13 +60,13 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer{
StringBuilder sb = new StringBuilder();
- Map<String,String> indexInner = new HashMap<>();
+ Map<String, String> indexInner = new HashMap<>();
String index = indexSerializer.serialize(row);
- indexInner.put("_index",index);
+ indexInner.put("_index", index);
indexTypeSerializer.fillType(indexInner);
- Map<String, Map<String,String>> indexParam = new HashMap<>();
- indexParam.put("index",indexInner);
+ Map<String, Map<String, String>> indexParam = new HashMap<>();
+ indexParam.put("index", indexInner);
try {
sb.append(objectMapper.writeValueAsString(indexParam));
sb.append("\n");
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
index d1fbae8a4..53300f984 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
-
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
public interface SeaTunnelRowSerializer {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
index 799763235..2ddfc35c5 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
@@ -33,7 +33,7 @@ public class VariableIndexSerializer implements
IndexSerializer {
private final String index;
private final Map<String, Integer> fieldIndexMap;
- private final String NULL_DEFAULT = "null";
+ private final String nullDefault = "null";
public VariableIndexSerializer(SeaTunnelRowType seaTunnelRowType, String
index, List<String> fieldNames) {
this.index = index;
@@ -61,7 +61,7 @@ public class VariableIndexSerializer implements
IndexSerializer {
private String getValue(int fieldIndex, SeaTunnelRow row) {
Object valueObj = row.getField(fieldIndex);
if (valueObj == null) {
- return NULL_DEFAULT;
+ return nullDefault;
} else {
return valueObj.toString();
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
index 3e528058e..7d0a395e0 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
-
import java.util.Map;
public interface IndexTypeSerializer {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
index 878257cb6..f70a54dc1 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
@@ -17,12 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.ES2;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.ES5;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.ES6;
+
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.NotIndexTypeSerializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.RequiredIndexTypeSerializer;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.*;
-
public class IndexTypeSerializerFactory {
private static final String DEFAULT_TYPE = "st";
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
index fa5afb5b8..57bfe8116 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
@@ -26,7 +26,6 @@ import java.util.Map;
*/
public class NotIndexTypeSerializer implements IndexTypeSerializer {
-
@Override
public void fillType(Map<String, String> indexInner) {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index a5eac83ac..e0f7630cf 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -17,9 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
-import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -30,25 +28,25 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.Elasticsear
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
-import java.util.Collections;
+import com.google.auto.service.AutoService;
+import java.util.Collections;
@AutoService(SeaTunnelSink.class)
public class ElasticsearchSink implements SeaTunnelSink<SeaTunnelRow,
ElasticsearchSinkState, ElasticsearchCommitInfo,
ElasticsearchAggregatedCommitInfo> {
private org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig;
- private SeaTunnelContext seaTunnelContext;
private SeaTunnelRowType seaTunnelRowType;
-
@Override
public String getPluginName() {
return "Elasticsearch";
}
@Override
- public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config
pluginConfig) throws PrepareFailException {
+ public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config
pluginConfig) throws
+ PrepareFailException {
this.pluginConfig = pluginConfig;
SinkConfig.setValue(pluginConfig);
}
@@ -67,10 +65,4 @@ public class ElasticsearchSink implements
SeaTunnelSink<SeaTunnelRow, Elasticsea
public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo,
ElasticsearchSinkState> createWriter(SinkWriter.Context context) {
return new ElasticsearchSinkWriter(context, seaTunnelRowType,
pluginConfig, Collections.emptyList());
}
-
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 048f48f44..d527d90d5 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -30,7 +30,9 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkEla
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +44,7 @@ import java.util.Optional;
/**
* ElasticsearchSinkWriter is a sink writer that will write {@link
SeaTunnelRow} to Elasticsearch.
*/
-public class ElasticsearchSinkWriter<ElasticsearchSinkState> implements
SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> {
+public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements
SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkStateT> {
private final Context context;
@@ -53,12 +55,11 @@ public class
ElasticsearchSinkWriter<ElasticsearchSinkState> implements SinkWrit
private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
-
public ElasticsearchSinkWriter(
Context context,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig,
- List<ElasticsearchSinkState> elasticsearchStates) {
+ List<ElasticsearchSinkStateT> elasticsearchStates) {
this.context = context;
IndexInfo indexInfo = new IndexInfo(pluginConfig);
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
index 9ccc413ff..097eca897 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
@@ -20,7 +20,6 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.util;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
-
import java.util.regex.Pattern;
public class RegexUtils {
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 0200362d0..fd9387f2d 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -36,12 +36,12 @@ import com.google.auto.service.AutoService;
public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private Config pluginConfig;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
private SeaTunnelSchema schema;
@Override
public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}
@Override
@@ -67,7 +67,7 @@ public class FakeSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
index 77b72f004..84827b717 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -47,7 +47,7 @@ public abstract class AbstractFileSink implements
SeaTunnelSink<SeaTunnelRow, Fi
private String jobId;
private Long checkpointId;
private SeaTunnelRowType seaTunnelRowTypeInfo;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
private TextFileSinkConfig textFileSinkConfig;
private SinkFileSystemPlugin sinkFileSystemPlugin;
@@ -72,7 +72,7 @@ public abstract class AbstractFileSink implements
SeaTunnelSink<SeaTunnelRow, Fi
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
createWriter(SinkWriter.Context context) throws IOException {
- if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) &&
this.getSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE)) {
+ if (!jobContext.getJobMode().equals(JobMode.BATCH) &&
this.getSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE)) {
throw new RuntimeException("only batch job can overwrite mode");
}
@@ -104,9 +104,9 @@ public abstract class AbstractFileSink implements
SeaTunnelSink<SeaTunnelRow, Fi
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- this.jobId = seaTunnelContext.getJobId();
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ this.jobId = jobContext.getJobId();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index e5bc67ffa..0e5187e5b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -48,13 +48,13 @@ public abstract class BaseFileSink implements
SeaTunnelSink<SeaTunnelRow, FileSi
protected HadoopConf hadoopConf;
protected TextFileSinkConfig textFileSinkConfig;
protected WriteStrategy writeStrategy;
- protected SeaTunnelContext seaTunnelContext;
+ protected JobContext jobContext;
protected String jobId;
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- this.jobId = seaTunnelContext.getJobId();
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ this.jobId = jobContext.getJobId();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index bcdca3d49..36bd50ca3 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -49,7 +49,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow,
HiveSinkState, Hive
private String jobId;
private Long checkpointId;
private SeaTunnelRowType seaTunnelRowTypeInfo;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
private HiveSinkConfig hiveSinkConfig;
@Override
@@ -76,7 +76,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow,
HiveSinkState, Hive
@Override
public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState>
createWriter(SinkWriter.Context context) throws IOException {
- if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) &&
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
{
+ if (!jobContext.getJobMode().equals(JobMode.BATCH) &&
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
{
throw new RuntimeException("only batch job can overwrite hive
table");
}
@@ -96,9 +96,9 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow,
HiveSinkState, Hive
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- this.jobId = seaTunnelContext.getJobId();
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ this.jobId = jobContext.getJobId();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 039d9b4c7..1f1a8c3fb 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -21,7 +21,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -47,8 +46,6 @@ import java.util.List;
@AutoService(SeaTunnelSource.class)
public class HiveSource implements SeaTunnelSource<SeaTunnelRow,
HiveSourceSplit, HiveSourceState> {
- private SeaTunnelContext seaTunnelContext;
-
private SeaTunnelRowType typeInfo;
private ReadStrategy readStrategy;
@@ -85,11 +82,6 @@ public class HiveSource implements
SeaTunnelSource<SeaTunnelRow, HiveSourceSplit
}
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.typeInfo;
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index cd52b19a6..dff630b12 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.http.source;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -45,7 +45,7 @@ import com.google.auto.service.AutoService;
public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
protected final HttpParameter httpParameter = new HttpParameter();
protected SeaTunnelRowType rowType;
- protected SeaTunnelContext seaTunnelContext;
+ protected JobContext jobContext;
protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
@Override
@@ -55,7 +55,7 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}
@Override
@@ -84,8 +84,8 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
index 2ca69d784..f1cbb619a 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
@@ -25,7 +25,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceCo
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -48,8 +47,6 @@ import java.io.IOException;
@AutoService(SeaTunnelSource.class)
public class HudiSource implements SeaTunnelSource<SeaTunnelRow,
HudiSourceSplit, HudiSourceState> {
- private SeaTunnelContext seaTunnelContext;
-
private SeaTunnelRowType typeInfo;
private String filePath;
@@ -103,11 +100,6 @@ public class HudiSource implements
SeaTunnelSource<SeaTunnelRow, HudiSourceSplit
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.typeInfo;
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
index a7c052592..668d2861d 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
@@ -21,8 +21,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfi
import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NODE_URLS;
import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -46,7 +46,7 @@ import java.util.Map;
@AutoService(SeaTunnelSource.class)
public class IoTDBSource implements SeaTunnelSource<SeaTunnelRow,
IoTDBSourceSplit, IoTDBSourceState> {
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
private SeaTunnelRowType typeInfo;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
index 3d2a82b3d..4f51c31a5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
import static com.google.common.base.Preconditions.checkArgument;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import javax.transaction.xa.Xid;
@@ -63,7 +63,7 @@ class SemanticXidGenerator
}
@Override
- public Xid generateXid(SeaTunnelContext context, SinkWriter.Context
sinkContext, long checkpointId) {
+ public Xid generateXid(JobContext context, SinkWriter.Context sinkContext,
long checkpointId) {
byte[] jobIdBytes = context.getJobId().getBytes();
checkArgument(jobIdBytes.length <= JOB_ID_BYTES);
System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES);
@@ -75,7 +75,7 @@ class SemanticXidGenerator
}
@Override
- public boolean belongsToSubtask(Xid xid, SeaTunnelContext context,
SinkWriter.Context sinkContext) {
+ public boolean belongsToSubtask(Xid xid, JobContext context,
SinkWriter.Context sinkContext) {
if (xid.getFormatId() != FORMAT_ID) {
return false;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
index e37e6b05a..11ebf0633 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -38,6 +38,6 @@ public interface XaGroupOps
GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids);
- void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context
sinkContext, XidGenerator xidGenerator, Xid excludeXid);
+ void recoverAndRollback(JobContext context, SinkWriter.Context
sinkContext, XidGenerator xidGenerator, Xid excludeXid);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
index 05ecce160..ff2012fd7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -112,7 +112,7 @@ public class XaGroupOpsImpl
}
@Override
- public void recoverAndRollback(SeaTunnelContext context,
SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
+ public void recoverAndRollback(JobContext context, SinkWriter.Context
sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
Collection<Xid> recovered = xaFacade.recover();
recovered.remove(excludeXid);
if (recovered.isEmpty()) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
index a80175054..de3ef0059 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import javax.transaction.xa.Xid;
@@ -31,14 +31,14 @@ import java.security.SecureRandom;
public interface XidGenerator
extends Serializable, AutoCloseable {
- Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext,
long checkpointId);
+ Xid generateXid(JobContext context, SinkWriter.Context sinkContext, long
checkpointId);
default void open() {}
/**
* @return true if the provided transaction belongs to this subtask
*/
- boolean belongsToSubtask(Xid xid, SeaTunnelContext context,
SinkWriter.Context sinkContext);
+ boolean belongsToSubtask(Xid xid, JobContext context, SinkWriter.Context
sinkContext);
@Override
default void close() {}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index b4527a9ff..20461db9f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
@@ -53,7 +53,7 @@ public class JdbcExactlyOnceSinkWriter
private final SinkWriter.Context sinkcontext;
- private final SeaTunnelContext context;
+ private final JobContext context;
private final List<JdbcSinkState> recoverStates;
@@ -72,7 +72,7 @@ public class JdbcExactlyOnceSinkWriter
public JdbcExactlyOnceSinkWriter(
SinkWriter.Context sinkcontext,
- SeaTunnelContext context,
+ JobContext context,
JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
JdbcSinkOptions jdbcSinkOptions,
List<JdbcSinkState> states) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 672303f8c..97a63b980 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -51,7 +51,7 @@ public class JdbcSink
private SeaTunnelRowType seaTunnelRowType;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
private JdbcSinkOptions jdbcSinkOptions;
@@ -76,7 +76,7 @@ public class JdbcSink
if (jdbcSinkOptions.isExactlyOnce()) {
sinkWriter = new JdbcExactlyOnceSinkWriter(
context,
- seaTunnelContext,
+ jobContext,
statementBuilder,
jdbcSinkOptions,
new ArrayList<>()
@@ -98,7 +98,7 @@ public class JdbcSink
JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) ->
JdbcUtils.setRecordToStatement(st, null, row);
return new JdbcExactlyOnceSinkWriter(
context,
- seaTunnelContext,
+ jobContext,
statementBuilder,
jdbcSinkOptions,
states
@@ -132,8 +132,8 @@ public class JdbcSink
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 2717436f1..a8d0766f3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -57,7 +56,6 @@ import java.util.Map;
public class JdbcSource implements SeaTunnelSource<SeaTunnelRow,
JdbcSourceSplit, JdbcSourceState> {
protected static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
- private SeaTunnelContext seaTunnelContext;
private JdbcSourceOptions jdbcSourceOptions;
private SeaTunnelRowType typeInfo;
@@ -96,11 +94,6 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
);
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index a4d534e67..62314b87a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -23,8 +23,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONS
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -53,11 +53,11 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
private final ConsumerMetadata metadata = new ConsumerMetadata();
private SeaTunnelRowType typeInfo;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
@Override
public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}
@Override
@@ -119,7 +119,7 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index 65b004a6a..c10796a21 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.source;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Boundedness;
@@ -53,7 +52,6 @@ import java.util.List;
public class KuduSource implements SeaTunnelSource<SeaTunnelRow,
KuduSourceSplit, KuduSourceState> {
private static final Logger LOGGER =
LoggerFactory.getLogger(KuduSource.class);
- private SeaTunnelContext seaTunnelContext;
private SeaTunnelRowType rowTypeInfo;
private KuduInputFormat kuduInputFormat;
private PartitionParameter partitionParameter;
@@ -167,11 +165,6 @@ public class KuduSource implements
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
return new PartitionParameter(keyColumn, Long.parseLong(minKey + ""),
Long.parseLong(maxKey + ""));
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema>
columnSchemaList) {
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
index 077097569..45c79048b 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -25,7 +25,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbCo
import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -50,8 +49,6 @@ import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
public class MongodbSource extends AbstractSingleSplitSource<SeaTunnelRow> {
- private SeaTunnelContext seaTunnelContext;
-
private SeaTunnelRowType rowType;
private MongodbParameters params;
@@ -91,11 +88,6 @@ public class MongodbSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index d860e6af5..407d32222 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.redis.source;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -43,7 +42,6 @@ import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private final RedisParameters redisParameters = new RedisParameters();
- private SeaTunnelContext seaTunnelContext;
private SeaTunnelRowType seaTunnelRowType;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
@@ -87,11 +85,6 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
return seaTunnelRowType;
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public AbstractSingleSplitReader<SeaTunnelRow>
createReader(SingleSplitReaderContext readerContext) throws Exception {
return new RedisSourceReader(redisParameters, readerContext,
deserializationSchema);
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index a237679d4..f97e00e93 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.socket.source;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -38,11 +38,11 @@ import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
public class SocketSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private SocketSourceParameter parameter;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
@Override
public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}
@Override
@@ -56,8 +56,8 @@ public class SocketSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
@Override
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 5e3178bfb..1b680b9e8 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.util.TableUtil;
@@ -41,6 +42,7 @@ public abstract class AbstractPluginExecuteProcessor<T>
implements PluginExecute
protected final FlinkEnvironment flinkEnvironment;
protected final List<? extends Config> pluginConfigs;
+ protected final JobContext jobContext;
protected final List<T> plugins;
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME = "plugin_name";
@@ -57,8 +59,10 @@ public abstract class AbstractPluginExecuteProcessor<T>
implements PluginExecute
};
protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ JobContext jobContext,
List<? extends Config>
pluginConfigs) {
this.flinkEnvironment = flinkEnvironment;
+ this.jobContext = jobContext;
this.pluginConfigs = pluginConfigs;
this.plugins = initializePlugins(pluginConfigs);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index df452dd85..e6f5719fa 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.config.EngineType;
import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
@@ -55,10 +55,11 @@ public class FlinkExecution implements TaskExecution {
public FlinkExecution(Config config) {
this.config = config;
this.flinkEnvironment = new
EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
-
SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
- this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
- this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
- this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
+ JobContext jobContext = new JobContext();
+ jobContext.setJobMode(flinkEnvironment.getJobMode());
+ this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList("source"));
+ this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList("transform"));
+ this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList("sink"));
registerPlugin();
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 8de5d422d..6a06516d5 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -46,8 +46,9 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
private static final String PLUGIN_TYPE = "sink";
protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ JobContext jobContext,
List<? extends Config> pluginConfigs) {
- super(flinkEnvironment, pluginConfigs);
+ super(flinkEnvironment, jobContext, pluginConfigs);
}
@Override
@@ -60,7 +61,7 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ seaTunnelSink.setJobContext(jobContext);
return seaTunnelSink;
}).distinct().collect(Collectors.toList());
flinkEnvironment.registerPlugin(pluginJars);
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index a1b31836f..1dc378a83 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.common.constants.JobMode;
@@ -53,8 +53,9 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
private static final String PLUGIN_TYPE = "source";
public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ JobContext jobContext,
List<? extends Config> sourceConfigs) {
- super(flinkEnvironment, sourceConfigs);
+ super(flinkEnvironment, jobContext, sourceConfigs);
}
@Override
@@ -110,8 +111,8 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSource seaTunnelSource =
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
- if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
+ seaTunnelSource.setJobContext(jobContext);
+ if (jobContext.getJobMode() == JobMode.BATCH
&& seaTunnelSource.getBoundedness() ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
throw new UnsupportedOperationException(String.format("'%s'
source don't support off-line job.", seaTunnelSource.getPluginName()));
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index d077f6d3a..62d65fbcd 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
@@ -39,8 +40,9 @@ public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<Fl
private static final String PLUGIN_TYPE = "transform";
protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ JobContext jobContext,
List<? extends Config> pluginConfigs) {
- super(flinkEnvironment, pluginConfigs);
+ super(flinkEnvironment, jobContext, pluginConfigs);
}
@Override
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
index 0dad6f680..af087c0c4 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -34,13 +35,16 @@ public abstract class AbstractPluginExecuteProcessor<T>
implements PluginExecute
protected final SparkEnvironment sparkEnvironment;
protected final List<? extends Config> pluginConfigs;
+ protected final JobContext jobContext;
protected final List<T> plugins;
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME = "plugin_name";
protected AbstractPluginExecuteProcessor(SparkEnvironment sparkEnvironment,
+ JobContext jobContext,
List<? extends Config>
pluginConfigs) {
this.sparkEnvironment = sparkEnvironment;
+ this.jobContext = jobContext;
this.pluginConfigs = pluginConfigs;
this.plugins = initializePlugins(pluginConfigs);
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 45ab01012..fbcecd7c8 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -43,8 +43,9 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
private static final String PLUGIN_TYPE = "sink";
protected SinkExecuteProcessor(SparkEnvironment sparkEnvironment,
+ JobContext jobContext,
List<? extends Config> pluginConfigs) {
- super(sparkEnvironment, pluginConfigs);
+ super(sparkEnvironment, jobContext, pluginConfigs);
}
@Override
@@ -56,7 +57,7 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ seaTunnelSink.setJobContext(jobContext);
return seaTunnelSink;
}).distinct().collect(Collectors.toList());
sparkEnvironment.registerPlugin(pluginJars);
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 8dfcb4ef4..f6eafe67c 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -44,8 +44,9 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
private static final String PLUGIN_TYPE = "source";
public SourceExecuteProcessor(SparkEnvironment sparkEnvironment,
+ JobContext jobContext,
List<? extends Config> sourceConfigs) {
- super(sparkEnvironment, sourceConfigs);
+ super(sparkEnvironment, jobContext, sourceConfigs);
}
@Override
@@ -75,7 +76,7 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSource<?, ?, ?> seaTunnelSource =
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+ seaTunnelSource.setJobContext(jobContext);
sources.add(seaTunnelSource);
}
sparkEnvironment.registerPlugin(new ArrayList<>(jars));
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 31193e0cf..c1d0c9c22 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.core.starter.config.EngineType;
import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
@@ -47,10 +47,11 @@ public class SparkExecution {
public SparkExecution(Config config) {
this.config = config;
this.sparkEnvironment = (SparkEnvironment) new
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
-
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
- this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(sparkEnvironment,
config.getConfigList(Constants.SOURCE));
- this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(sparkEnvironment,
config.getConfigList(Constants.TRANSFORM));
- this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(sparkEnvironment, config.getConfigList(Constants.SINK));
+ JobContext jobContext = new JobContext();
+ jobContext.setJobMode(sparkEnvironment.getJobMode());
+ this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(sparkEnvironment, jobContext,
config.getConfigList(Constants.SOURCE));
+ this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(sparkEnvironment, jobContext,
config.getConfigList(Constants.TRANSFORM));
+ this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(sparkEnvironment, jobContext,
config.getConfigList(Constants.SINK));
}
public void execute() throws TaskExecuteException {
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 5668457b7..66d5e1ee6 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
@@ -39,8 +40,9 @@ public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<Ba
private static final String PLUGIN_TYPE = "transform";
protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
+ JobContext jobContext,
List<? extends Config> pluginConfigs) {
- super(sparkEnvironment, pluginConfigs);
+ super(sparkEnvironment, jobContext, pluginConfigs);
}
@Override