This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 8fbf1f60a Fix JobContext#jobId error (#2744)
8fbf1f60a is described below
commit 8fbf1f60a9469d28b7351a581dc7034a12ab1f73
Author: Eric <[email protected]>
AuthorDate: Fri Sep 16 17:11:48 2022 +0800
Fix JobContext#jobId error (#2744)
---
.../main/java/org/apache/seatunnel/api/common/JobContext.java | 4 ++++
.../apache/seatunnel/engine/client/job/JobConfigParser.java | 10 ++++------
.../seatunnel/engine/client/job/JobExecutionEnvironment.java | 10 ++++++----
.../apache/seatunnel/engine/client/JobConfigParserTest.java | 9 +++++++--
.../seatunnel/engine/client/LogicalDagGeneratorTest.java | 4 +++-
5 files changed, 24 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
index 9e56de36a..854dd7ac7 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
@@ -45,6 +45,10 @@ public final class JobContext implements Serializable {
this.jobId = UUID.randomUUID().toString().replace("-", "");
}
+ public JobContext(Long jobId) {
+ this.jobId = jobId + "";
+ }
+
/**
* Put table schema.
*
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index b1e5aa1fb..51eba4d29 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.engine.client.job;
-import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -109,13 +108,11 @@ public class JobConfigParser {
}
private void jobConfigAnalyze(@NonNull Config envConfigs) {
- JobContext jobContext = new JobContext();
if (envConfigs.hasPath("job.mode")) {
- jobContext.setJobMode(envConfigs.getEnum(JobMode.class,
"job.mode"));
+
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class,
"job.mode"));
} else {
- jobContext.setJobMode(JobMode.BATCH);
+ jobConfig.getJobContext().setJobMode(JobMode.BATCH);
}
- jobConfig.setJobContext(jobContext);
}
/**
@@ -293,7 +290,8 @@ public class JobConfigParser {
}
ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>, Set<URL>>
- sinkListImmutablePair =
ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0),
jobConfig.getJobContext());
+ sinkListImmutablePair =
+ ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0),
jobConfig.getJobContext());
SinkAction sinkAction = createSinkAction(
idGenerator.getNextId(),
sinkListImmutablePair.getLeft().getPluginName(),
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 7176867a2..57b5a8e3e 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.client.job;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -35,8 +36,6 @@ import java.util.concurrent.ExecutionException;
public class JobExecutionEnvironment {
- private static String DEFAULT_JOB_NAME = "test_st_job";
-
private JobConfig jobConfig;
private int maxParallelism = 1;
@@ -51,12 +50,16 @@ public class JobExecutionEnvironment {
private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+ private final JobClient jobClient;
+
public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath,
SeaTunnelHazelcastClient
seaTunnelHazelcastClient) {
this.jobConfig = jobConfig;
this.jobFilePath = jobFilePath;
this.idGenerator = new IdGenerator();
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
+ this.jobClient = new JobClient(seaTunnelHazelcastClient);
+ this.jobConfig.setJobContext(new JobContext(jobClient.getNewJobId()));
}
private JobConfigParser getJobConfigParser() {
@@ -76,9 +79,8 @@ public class JobExecutionEnvironment {
}
public ClientJobProxy execute() throws ExecutionException,
InterruptedException {
- JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
JobImmutableInformation jobImmutableInformation = new
JobImmutableInformation(
- jobClient.getNewJobId(),
+ Long.valueOf(jobConfig.getJobContext().getJobId()),
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
jobConfig,
jarUrls);
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index a04932eeb..ba1f6d0d2 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.job.JobConfigParser;
@@ -42,7 +43,9 @@ public class JobConfigParserTest {
public void testSimpleJobParse() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath =
TestUtils.getResource("/batch_fakesource_to_file.conf");
- JobConfigParser jobConfigParser = new JobConfigParser(filePath, new
IdGenerator(), new JobConfig());
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setJobContext(new JobContext());
+ JobConfigParser jobConfigParser = new JobConfigParser(filePath, new
IdGenerator(), jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assert.assertEquals(1, actions.size());
@@ -59,7 +62,9 @@ public class JobConfigParserTest {
public void testComplexJobParse() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath =
TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
- JobConfigParser jobConfigParser = new JobConfigParser(filePath, new
IdGenerator(), new JobConfig());
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setJobContext(new JobContext());
+ JobConfigParser jobConfigParser = new JobConfigParser(filePath, new
IdGenerator(), jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assert.assertEquals(1, actions.size());
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 1c0da5503..66928e0e9 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.job.JobConfigParser;
@@ -45,10 +46,11 @@ public class LogicalDagGeneratorTest {
String filePath =
TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
+ jobConfig.setJobContext(new JobContext());
IdGenerator idGenerator = new IdGenerator();
ImmutablePair<List<Action>, Set<URL>> immutablePair =
- new JobConfigParser(filePath, idGenerator, new
JobConfig()).parse();
+ new JobConfigParser(filePath, idGenerator, jobConfig).parse();
LogicalDagGenerator logicalDagGenerator =
new LogicalDagGenerator(immutablePair.getLeft(), jobConfig,
idGenerator);