This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9980aa44d2 [Fix][Rest-API] fix submit job api (#5702)
9980aa44d2 is described below

commit 9980aa44d296f4bf29f707babbb6536e6f3e2047
Author: Jarvis <[email protected]>
AuthorDate: Fri Oct 27 10:34:46 2023 +0800

    [Fix][Rest-API] fix submit job api (#5702)
---
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 28 +++++++++++++++-------
 .../server/job/RestJobExecutionEnvironment.java    |  2 +-
 .../server/rest/RestHttpPostCommandProcessor.java  | 14 ++++++++---
 .../seatunnel/engine/server/utils/RestUtil.java    |  4 ++++
 4 files changed, 35 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index fec5dc65f2..88eb82e1d5 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.engine.e2e;
 
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
@@ -63,7 +61,6 @@ public class RestApiIT {
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
         seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
         hazelcastInstance = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
-        Common.setDeployMode(DeployMode.CLIENT);
         String filePath = 
TestUtils.getResource("stream_fakesource_to_file.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
@@ -136,7 +133,9 @@ public class RestApiIT {
 
     @Test
     public void testSubmitJob() {
-        String jobId = 
submitJob("BATCH").getBody().jsonPath().getString("jobId");
+        Response response = submitJob("BATCH");
+        response.then().statusCode(200).body("jobName", equalTo("test"));
+        String jobId = response.getBody().jsonPath().getString("jobId");
         SeaTunnelServer seaTunnelServer =
                 (SeaTunnelServer)
                         hazelcastInstance
@@ -246,6 +245,14 @@ public class RestApiIT {
                                                 
.getJobStatus(Long.parseLong(jobId2))));
     }
 
+    @Test
+    public void testStartWithSavePointWithoutJobId() {
+        Response response = submitJob("BATCH", true);
+        response.then()
+                .statusCode(400)
+                .body("message", equalTo("Please provide jobId when start with 
save point."));
+    }
+
     @AfterEach
     void afterClass() {
         if (hazelcastInstance != null) {
@@ -254,6 +261,10 @@ public class RestApiIT {
     }
 
     private Response submitJob(String jobMode) {
+        return submitJob(jobMode, false);
+    }
+
+    private Response submitJob(String jobMode, boolean isStartWithSavePoint) {
         String requestBody =
                 "{\n"
                         + "    \"env\": {\n"
@@ -284,9 +295,10 @@ public class RestApiIT {
                         + "        }\n"
                         + "    ]\n"
                         + "}";
-        String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false";
-        // Only jobName is compared because jobId is randomly generated if 
isStartWithSavePoint is
-        // false
+        String parameters = "jobName=test";
+        if (isStartWithSavePoint) {
+            parameters = parameters + "&isStartWithSavePoint=true";
+        }
         Response response =
                 given().body(requestBody)
                         .post(
@@ -299,8 +311,6 @@ public class RestApiIT {
                                         + RestConstant.SUBMIT_JOB_URL
                                         + "?"
                                         + parameters);
-
-        response.then().statusCode(200).body("jobName", equalTo("test"));
         return response;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java
index 7111e64710..c3a41889c9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java
@@ -55,7 +55,7 @@ public class RestJobExecutionEnvironment extends 
AbstractJobEnvironment {
                                         .getHazelcastInstance()
                                         
.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)
                                         .newId()));
-        this.jobId = Long.valueOf(jobConfig.getJobContext().getJobId());
+        this.jobId = Long.valueOf(this.jobConfig.getJobContext().getJobId());
     }
 
     public Long getJobId() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index b1d66a4e39..e248048234 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.engine.server.rest;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -96,14 +98,20 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
         Config config = RestUtil.buildConfig(requestHandle(httpPostCommand));
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName(requestParams.get(RestConstant.JOB_NAME));
+        if (Common.getDeployMode() == null) {
+            Common.setDeployMode(DeployMode.CLIENT);
+        }
+        boolean startWithSavePoint =
+                
Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT));
         RestJobExecutionEnvironment restJobExecutionEnvironment =
                 new RestJobExecutionEnvironment(
                         jobConfig,
                         config,
                         textCommandService.getNode(),
-                        Boolean.parseBoolean(
-                                
requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)),
-                        
Long.parseLong(requestParams.get(RestConstant.JOB_ID)));
+                        startWithSavePoint,
+                        startWithSavePoint
+                                ? 
Long.parseLong(requestParams.get(RestConstant.JOB_ID))
+                                : null);
         JobImmutableInformation jobImmutableInformation = 
restJobExecutionEnvironment.build();
         CoordinatorService coordinatorService = 
getSeaTunnelServer().getCoordinatorService();
         Data data =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
index 51c50d85a2..d2ac601890 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
@@ -57,6 +57,10 @@ public class RestUtil {
         } catch (IndexOutOfBoundsException e) {
             throw new IllegalArgumentException("Invalid Params format in 
Params.");
         }
+        if 
(Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT))
+                && requestParams.get(RestConstant.JOB_ID) == null) {
+            throw new IllegalArgumentException("Please provide jobId when 
start with save point.");
+        }
     }
 
     public static Config buildConfig(JsonNode jsonNode) {

Reply via email to