This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9980aa44d2 [Fix][Rest-API] fix submit job api (#5702)
9980aa44d2 is described below
commit 9980aa44d296f4bf29f707babbb6536e6f3e2047
Author: Jarvis <[email protected]>
AuthorDate: Fri Oct 27 10:34:46 2023 +0800
[Fix][Rest-API] fix submit job api (#5702)
---
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 28 +++++++++++++++-------
.../server/job/RestJobExecutionEnvironment.java | 2 +-
.../server/rest/RestHttpPostCommandProcessor.java | 14 ++++++++---
.../seatunnel/engine/server/utils/RestUtil.java | 4 ++++
4 files changed, 35 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index fec5dc65f2..88eb82e1d5 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.engine.e2e;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
@@ -63,7 +61,6 @@ public class RestApiIT {
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
hazelcastInstance =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- Common.setDeployMode(DeployMode.CLIENT);
String filePath =
TestUtils.getResource("stream_fakesource_to_file.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
@@ -136,7 +133,9 @@ public class RestApiIT {
@Test
public void testSubmitJob() {
- String jobId =
submitJob("BATCH").getBody().jsonPath().getString("jobId");
+ Response response = submitJob("BATCH");
+ response.then().statusCode(200).body("jobName", equalTo("test"));
+ String jobId = response.getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
@@ -246,6 +245,14 @@ public class RestApiIT {
.getJobStatus(Long.parseLong(jobId2))));
}
+ @Test
+ public void testStartWithSavePointWithoutJobId() {
+ Response response = submitJob("BATCH", true);
+ response.then()
+ .statusCode(400)
+ .body("message", equalTo("Please provide jobId when start with
save point."));
+ }
+
@AfterEach
void afterClass() {
if (hazelcastInstance != null) {
@@ -254,6 +261,10 @@ public class RestApiIT {
}
private Response submitJob(String jobMode) {
+ return submitJob(jobMode, false);
+ }
+
+ private Response submitJob(String jobMode, boolean isStartWithSavePoint) {
String requestBody =
"{\n"
+ " \"env\": {\n"
@@ -284,9 +295,10 @@ public class RestApiIT {
+ " }\n"
+ " ]\n"
+ "}";
- String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false";
- // Only jobName is compared because jobId is randomly generated if
isStartWithSavePoint is
- // false
+ String parameters = "jobName=test";
+ if (isStartWithSavePoint) {
+ parameters = parameters + "&isStartWithSavePoint=true";
+ }
Response response =
given().body(requestBody)
.post(
@@ -299,8 +311,6 @@ public class RestApiIT {
+ RestConstant.SUBMIT_JOB_URL
+ "?"
+ parameters);
-
- response.then().statusCode(200).body("jobName", equalTo("test"));
return response;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java
index 7111e64710..c3a41889c9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java
@@ -55,7 +55,7 @@ public class RestJobExecutionEnvironment extends
AbstractJobEnvironment {
.getHazelcastInstance()
.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)
.newId()));
- this.jobId = Long.valueOf(jobConfig.getJobContext().getJobId());
+ this.jobId = Long.valueOf(this.jobConfig.getJobContext().getJobId());
}
public Long getJobId() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index b1d66a4e39..e248048234 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.engine.server.rest;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -96,14 +98,20 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
Config config = RestUtil.buildConfig(requestHandle(httpPostCommand));
JobConfig jobConfig = new JobConfig();
jobConfig.setName(requestParams.get(RestConstant.JOB_NAME));
+ if (Common.getDeployMode() == null) {
+ Common.setDeployMode(DeployMode.CLIENT);
+ }
+ boolean startWithSavePoint =
+
Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT));
RestJobExecutionEnvironment restJobExecutionEnvironment =
new RestJobExecutionEnvironment(
jobConfig,
config,
textCommandService.getNode(),
- Boolean.parseBoolean(
-
requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)),
-
Long.parseLong(requestParams.get(RestConstant.JOB_ID)));
+ startWithSavePoint,
+ startWithSavePoint
+ ?
Long.parseLong(requestParams.get(RestConstant.JOB_ID))
+ : null);
JobImmutableInformation jobImmutableInformation =
restJobExecutionEnvironment.build();
CoordinatorService coordinatorService =
getSeaTunnelServer().getCoordinatorService();
Data data =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
index 51c50d85a2..d2ac601890 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
@@ -57,6 +57,10 @@ public class RestUtil {
} catch (IndexOutOfBoundsException e) {
throw new IllegalArgumentException("Invalid Params format in
Params.");
}
+ if
(Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT))
+ && requestParams.get(RestConstant.JOB_ID) == null) {
+ throw new IllegalArgumentException("Please provide jobId when
start with save point.");
+ }
}
public static Config buildConfig(JsonNode jsonNode) {