This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8d2fafcf26 [Feature][Zeta][REST-API] Stop a running job. (#5512)
8d2fafcf26 is described below

commit 8d2fafcf269de1be6964e39dec4b88c6e43d4afb
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Oct 4 09:13:08 2023 +0800

    [Feature][Zeta][REST-API] Stop a running job. (#5512)
---
 docs/en/seatunnel-engine/rest-api.md               |  24 ++++
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 152 ++++++++++++++++-----
 .../seatunnel/engine/server/rest/RestConstant.java |  23 ++++
 .../server/rest/RestHttpGetCommandProcessor.java   |  29 ++--
 .../server/rest/RestHttpPostCommandProcessor.java  |  66 +++++++--
 .../seatunnel/engine/server/utils/RestUtil.java    |   7 +-
 6 files changed, 239 insertions(+), 62 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api.md 
b/docs/en/seatunnel-engine/rest-api.md
index 2f44421a3d..3f8cf910ea 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -238,3 +238,27 @@ network:
 
 
------------------------------------------------------------------------------------------
 
+### Stop Job.
+
+<details>
+<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> 
<code>(Returns jobId if job stoped successfully.)</code></summary>
+
+#### Body
+
+```json
+{
+    "jobId": 733584788375666689,
+    "isStopWithSavePoint": false # if job is stopped with save point
+}
+```
+
+#### Responses
+
+```json
+{
+"jobId": 733584788375666689
+}
+```
+
+</details>
+
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index d38d1c732f..d896ec17bf 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -32,9 +32,9 @@ import 
org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
 
 import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import com.hazelcast.client.config.ClientConfig;
@@ -57,8 +57,8 @@ public class RestApiIT {
 
     private static HazelcastInstanceImpl hazelcastInstance;
 
-    @BeforeAll
-    static void beforeClass() throws Exception {
+    @BeforeEach
+    void beforeClass() throws Exception {
         String testClusterName = TestUtils.getClusterName("RestApiIT");
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
         seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
@@ -136,10 +136,124 @@ public class RestApiIT {
 
     @Test
     public void testSubmitJob() {
+        String jobId = 
submitJob("BATCH").getBody().jsonPath().getString("jobId");
+        SeaTunnelServer seaTunnelServer =
+                (SeaTunnelServer)
+                        hazelcastInstance
+                                .node
+                                .getNodeExtension()
+                                .createExtensionServices()
+                                .get(Constant.SEATUNNEL_SERVICE_NAME);
+        JobStatus jobStatus =
+                
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
+        Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FINISHED,
+                                        seaTunnelServer
+                                                .getCoordinatorService()
+                                                
.getJobStatus(Long.parseLong(jobId))));
+    }
+
+    @Test
+    public void testStopJob() {
+        String jobId = 
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+        SeaTunnelServer seaTunnelServer =
+                (SeaTunnelServer)
+                        hazelcastInstance
+                                .node
+                                .getNodeExtension()
+                                .createExtensionServices()
+                                .get(Constant.SEATUNNEL_SERVICE_NAME);
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.RUNNING,
+                                        seaTunnelServer
+                                                .getCoordinatorService()
+                                                
.getJobStatus(Long.parseLong(jobId))));
+
+        String parameters = "{" + "\"jobId\":" + jobId + "," + 
"\"isStopWithSavePoint\":true}";
+
+        given().body(parameters)
+                .post(
+                        HOST
+                                + hazelcastInstance
+                                        .getCluster()
+                                        .getLocalMember()
+                                        .getAddress()
+                                        .getPort()
+                                + RestConstant.STOP_JOB_URL)
+                .then()
+                .statusCode(200)
+                .body("jobId", equalTo(jobId));
+
+        Awaitility.await()
+                .atMost(6, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FINISHED,
+                                        seaTunnelServer
+                                                .getCoordinatorService()
+                                                
.getJobStatus(Long.parseLong(jobId))));
+
+        String jobId2 = 
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.RUNNING,
+                                        seaTunnelServer
+                                                .getCoordinatorService()
+                                                
.getJobStatus(Long.parseLong(jobId2))));
+        parameters = "{" + "\"jobId\":" + jobId2 + "," + 
"\"isStopWithSavePoint\":false}";
+
+        given().body(parameters)
+                .post(
+                        HOST
+                                + hazelcastInstance
+                                        .getCluster()
+                                        .getLocalMember()
+                                        .getAddress()
+                                        .getPort()
+                                + RestConstant.STOP_JOB_URL)
+                .then()
+                .statusCode(200)
+                .body("jobId", equalTo(jobId2));
+
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.CANCELED,
+                                        seaTunnelServer
+                                                .getCoordinatorService()
+                                                
.getJobStatus(Long.parseLong(jobId2))));
+    }
+
+    @AfterEach
+    void afterClass() {
+        if (hazelcastInstance != null) {
+            hazelcastInstance.shutdown();
+        }
+    }
+
+    private Response submitJob(String jobMode) {
         String requestBody =
                 "{\n"
                         + "    \"env\": {\n"
-                        + "        \"job.mode\": \"batch\"\n"
+                        + "        \"job.mode\": \""
+                        + jobMode
+                        + "\"\n"
                         + "    },\n"
                         + "    \"source\": [\n"
                         + "        {\n"
@@ -181,32 +295,6 @@ public class RestApiIT {
                                         + parameters);
 
         response.then().statusCode(200).body("jobName", equalTo("test"));
-        String jobId = response.getBody().jsonPath().getString("jobId");
-        SeaTunnelServer seaTunnelServer =
-                (SeaTunnelServer)
-                        hazelcastInstance
-                                .node
-                                .getNodeExtension()
-                                .createExtensionServices()
-                                .get(Constant.SEATUNNEL_SERVICE_NAME);
-        JobStatus jobStatus =
-                
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
-        Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
-        Awaitility.await()
-                .atMost(2, TimeUnit.MINUTES)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertEquals(
-                                        JobStatus.FINISHED,
-                                        seaTunnelServer
-                                                .getCoordinatorService()
-                                                
.getJobStatus(Long.parseLong(jobId))));
-    }
-
-    @AfterAll
-    static void afterClass() {
-        if (hazelcastInstance != null) {
-            hazelcastInstance.shutdown();
-        }
+        return response;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 7776d592b8..c3178e3672 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -19,10 +19,33 @@ package org.apache.seatunnel.engine.server.rest;
 
 public class RestConstant {
 
+    public static final String JOB_ID = "jobId";
+
+    public static final String JOB_NAME = "jobName";
+
+    public static final String IS_START_WITH_SAVE_POINT = 
"isStartWithSavePoint";
+
+    public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";
+
+    public static final String JOB_STATUS = "jobStatus";
+
+    public static final String CREATE_TIME = "createTime";
+
+    public static final String ENV_OPTIONS = "envOptions";
+
+    public static final String JOB_DAG = "jobDag";
+
+    public static final String PLUGIN_JARS_URLS = "pluginJarsUrls";
+
+    public static final String JAR_PATH = "jarPath";
+
+    public static final String METRICS = "metrics";
     public static final String RUNNING_JOBS_URL = 
"/hazelcast/rest/maps/running-jobs";
     public static final String RUNNING_JOB_URL = 
"/hazelcast/rest/maps/running-job";
     public static final String SUBMIT_JOB_URL = 
"/hazelcast/rest/maps/submit-job";
 
     public static final String SYSTEM_MONITORING_INFORMATION =
             "/hazelcast/rest/maps/system-monitoring-information";
+
+    public static final String STOP_JOB_URL = "/hazelcast/rest/maps/stop-job";
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 4c1debd6f8..9addb8d8ec 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -203,8 +203,8 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
         } catch (JsonProcessingException | NullPointerException e) {
             return metricsMap;
         }
-        metricsMap.put("sourceReceivedCount", sourceReadCount);
-        metricsMap.put("sinkWriteCount", sinkWriteCount);
+        metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
+        metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);
 
         return metricsMap;
     }
@@ -243,28 +243,33 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
         JobStatus jobStatus = 
getSeaTunnelServer().getCoordinatorService().getJobStatus(jobId);
 
         jobInfoJson
-                .add("jobId", String.valueOf(jobId))
-                .add("jobName", logicalDag.getJobConfig().getName())
-                .add("jobStatus", jobStatus.toString())
-                .add("envOptions", 
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
+                .add(RestConstant.JOB_ID, String.valueOf(jobId))
+                .add(RestConstant.JOB_NAME, 
logicalDag.getJobConfig().getName())
+                .add(RestConstant.JOB_STATUS, jobStatus.toString())
                 .add(
-                        "createTime",
+                        RestConstant.ENV_OPTIONS,
+                        
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
+                .add(
+                        RestConstant.CREATE_TIME,
                         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                 .format(new 
Date(jobImmutableInformation.getCreateTime())))
-                .add("jobDag", logicalDag.getLogicalDagAsJson())
+                .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
                 .add(
-                        "pluginJarsUrls",
+                        RestConstant.PLUGIN_JARS_URLS,
                         (JsonValue)
                                 
jobImmutableInformation.getPluginJarsUrls().stream()
                                         .map(
                                                 url -> {
                                                     JsonObject jarUrl = new 
JsonObject();
-                                                    jarUrl.add("jarPath", 
url.toString());
+                                                    jarUrl.add(
+                                                            
RestConstant.JAR_PATH, url.toString());
                                                     return jarUrl;
                                                 })
                                         .collect(JsonArray::new, 
JsonArray::add, JsonArray::add))
-                .add("isStartWithSavePoint", 
jobImmutableInformation.isStartWithSavePoint())
-                .add("metrics", 
JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+                .add(
+                        RestConstant.IS_START_WITH_SAVE_POINT,
+                        jobImmutableInformation.isStartWithSavePoint())
+                .add(RestConstant.METRICS, 
JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
 
         return jobInfoJson;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index e0edd93203..66a9131f65 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.rest;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -42,6 +43,7 @@ import java.util.Map;
 
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400;
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.STOP_JOB_URL;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL;
 
 public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostCommand> {
@@ -66,6 +68,8 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
         try {
             if (uri.startsWith(SUBMIT_JOB_URL)) {
                 handleSubmitJob(httpPostCommand, uri);
+            } else if (uri.startsWith(STOP_JOB_URL)) {
+                handleStopJob(httpPostCommand, uri);
             } else {
                 original.handle(httpPostCommand);
             }
@@ -89,26 +93,17 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
             throws IllegalArgumentException {
         Map<String, String> requestParams = new HashMap<>();
         RestUtil.buildRequestParams(requestParams, uri);
-        byte[] requestBody = httpPostCommand.getData();
-        if (requestBody.length == 0) {
-            throw new IllegalArgumentException("Request body is empty.");
-        }
-        JsonNode requestBodyJsonNode;
-        try {
-            requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
-        } catch (IOException e) {
-            throw new IllegalArgumentException("Invalid JSON format in request 
body.");
-        }
-        Config config = RestUtil.buildConfig(requestBodyJsonNode);
+        Config config = RestUtil.buildConfig(requestHandle(httpPostCommand));
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setName(requestParams.get("jobName"));
+        jobConfig.setName(requestParams.get(RestConstant.JOB_NAME));
         JobImmutableInformationEnv jobImmutableInformationEnv =
                 new JobImmutableInformationEnv(
                         jobConfig,
                         config,
                         textCommandService.getNode(),
-                        
Boolean.parseBoolean(requestParams.get("isStartWithSavePoint")),
-                        Long.parseLong(requestParams.get("jobId")));
+                        Boolean.parseBoolean(
+                                
requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)),
+                        
Long.parseLong(requestParams.get(RestConstant.JOB_ID)));
         JobImmutableInformation jobImmutableInformation = 
jobImmutableInformationEnv.build();
         CoordinatorService coordinatorService = 
getSeaTunnelServer().getCoordinatorService();
         Data data =
@@ -125,11 +120,52 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
         Long jobId = jobImmutableInformationEnv.getJobId();
         this.prepareResponse(
                 httpPostCommand,
-                new JsonObject().add("jobId", jobId).add("jobName", 
requestParams.get("jobName")));
+                new JsonObject()
+                        .add(RestConstant.JOB_ID, jobId)
+                        .add(RestConstant.JOB_NAME, 
requestParams.get(RestConstant.JOB_NAME)));
+    }
+
+    private void handleStopJob(HttpPostCommand httpPostCommand, String uri) {
+        Map<String, Object> map = 
JsonUtils.toMap(requestHandle(httpPostCommand));
+        boolean isStopWithSavePoint = false;
+        if (map.get(RestConstant.JOB_ID) == null) {
+            throw new IllegalArgumentException("jobId cannot be empty.");
+        }
+        long jobId = Long.parseLong(map.get(RestConstant.JOB_ID).toString());
+        if (map.get(RestConstant.IS_STOP_WITH_SAVE_POINT) != null) {
+            isStopWithSavePoint =
+                    
Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
+        }
+
+        CoordinatorService coordinatorService = 
getSeaTunnelServer().getCoordinatorService();
+
+        if (isStopWithSavePoint) {
+            coordinatorService.savePoint(jobId);
+        } else {
+            coordinatorService.cancelJob(jobId);
+        }
+
+        this.prepareResponse(
+                httpPostCommand,
+                new JsonObject().add(RestConstant.JOB_ID, 
map.get(RestConstant.JOB_ID).toString()));
     }
 
     @Override
     public void handleRejection(HttpPostCommand httpPostCommand) {
         handle(httpPostCommand);
     }
+
+    private JsonNode requestHandle(HttpPostCommand httpPostCommand) {
+        byte[] requestBody = httpPostCommand.getData();
+        if (requestBody.length == 0) {
+            throw new IllegalArgumentException("Request body is empty.");
+        }
+        JsonNode requestBodyJsonNode;
+        try {
+            requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Invalid JSON format in request 
body.");
+        }
+        return requestBodyJsonNode;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
index d3761366d0..51c50d85a2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
 
 import com.hazelcast.internal.util.StringUtil;
 
@@ -40,9 +41,9 @@ public class RestUtil {
     }
 
     public static void buildRequestParams(Map<String, String> requestParams, 
String uri) {
-        requestParams.put("jobId", null);
-        requestParams.put("jobName", Constants.LOGO);
-        requestParams.put("isStartWithSavePoint", String.valueOf(false));
+        requestParams.put(RestConstant.JOB_ID, null);
+        requestParams.put(RestConstant.JOB_NAME, Constants.LOGO);
+        requestParams.put(RestConstant.IS_START_WITH_SAVE_POINT, 
String.valueOf(false));
         uri = StringUtil.stripTrailingSlash(uri);
         if (!uri.contains("?")) {
             return;

Reply via email to