This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8d2fafcf26 [Feature][Zeta][REST-API] Stop a running job. (#5512)
8d2fafcf26 is described below
commit 8d2fafcf269de1be6964e39dec4b88c6e43d4afb
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Oct 4 09:13:08 2023 +0800
[Feature][Zeta][REST-API] Stop a running job. (#5512)
---
docs/en/seatunnel-engine/rest-api.md | 24 ++++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 152 ++++++++++++++++-----
.../seatunnel/engine/server/rest/RestConstant.java | 23 ++++
.../server/rest/RestHttpGetCommandProcessor.java | 29 ++--
.../server/rest/RestHttpPostCommandProcessor.java | 66 +++++++--
.../seatunnel/engine/server/utils/RestUtil.java | 7 +-
6 files changed, 239 insertions(+), 62 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api.md
b/docs/en/seatunnel-engine/rest-api.md
index 2f44421a3d..3f8cf910ea 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -238,3 +238,27 @@ network:
------------------------------------------------------------------------------------------
+### Stop Job.
+
+<details>
+<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code>
<code>(Returns jobId if job stoped successfully.)</code></summary>
+
+#### Body
+
+```json
+{
+ "jobId": 733584788375666689,
+ "isStopWithSavePoint": false # if job is stopped with save point
+}
+```
+
+#### Responses
+
+```json
+{
+"jobId": 733584788375666689
+}
+```
+
+</details>
+
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index d38d1c732f..d896ec17bf 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -32,9 +32,9 @@ import
org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;
import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
@@ -57,8 +57,8 @@ public class RestApiIT {
private static HazelcastInstanceImpl hazelcastInstance;
- @BeforeAll
- static void beforeClass() throws Exception {
+ @BeforeEach
+ void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
@@ -136,10 +136,124 @@ public class RestApiIT {
@Test
public void testSubmitJob() {
+ String jobId =
submitJob("BATCH").getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer =
+ (SeaTunnelServer)
+ hazelcastInstance
+ .node
+ .getNodeExtension()
+ .createExtensionServices()
+ .get(Constant.SEATUNNEL_SERVICE_NAME);
+ JobStatus jobStatus =
+
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
+ Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+ seaTunnelServer
+ .getCoordinatorService()
+
.getJobStatus(Long.parseLong(jobId))));
+ }
+
+ @Test
+ public void testStopJob() {
+ String jobId =
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer =
+ (SeaTunnelServer)
+ hazelcastInstance
+ .node
+ .getNodeExtension()
+ .createExtensionServices()
+ .get(Constant.SEATUNNEL_SERVICE_NAME);
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
+ seaTunnelServer
+ .getCoordinatorService()
+
.getJobStatus(Long.parseLong(jobId))));
+
+ String parameters = "{" + "\"jobId\":" + jobId + "," +
"\"isStopWithSavePoint\":true}";
+
+ given().body(parameters)
+ .post(
+ HOST
+ + hazelcastInstance
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+ seaTunnelServer
+ .getCoordinatorService()
+
.getJobStatus(Long.parseLong(jobId))));
+
+ String jobId2 =
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
+ seaTunnelServer
+ .getCoordinatorService()
+
.getJobStatus(Long.parseLong(jobId2))));
+ parameters = "{" + "\"jobId\":" + jobId2 + "," +
"\"isStopWithSavePoint\":false}";
+
+ given().body(parameters)
+ .post(
+ HOST
+ + hazelcastInstance
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId2));
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.CANCELED,
+ seaTunnelServer
+ .getCoordinatorService()
+
.getJobStatus(Long.parseLong(jobId2))));
+ }
+
+ @AfterEach
+ void afterClass() {
+ if (hazelcastInstance != null) {
+ hazelcastInstance.shutdown();
+ }
+ }
+
+ private Response submitJob(String jobMode) {
String requestBody =
"{\n"
+ " \"env\": {\n"
- + " \"job.mode\": \"batch\"\n"
+ + " \"job.mode\": \""
+ + jobMode
+ + "\"\n"
+ " },\n"
+ " \"source\": [\n"
+ " {\n"
@@ -181,32 +295,6 @@ public class RestApiIT {
+ parameters);
response.then().statusCode(200).body("jobName", equalTo("test"));
- String jobId = response.getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- JobStatus jobStatus =
-
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
- Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
- }
-
- @AfterAll
- static void afterClass() {
- if (hazelcastInstance != null) {
- hazelcastInstance.shutdown();
- }
+ return response;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 7776d592b8..c3178e3672 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -19,10 +19,33 @@ package org.apache.seatunnel.engine.server.rest;
public class RestConstant {
+ public static final String JOB_ID = "jobId";
+
+ public static final String JOB_NAME = "jobName";
+
+ public static final String IS_START_WITH_SAVE_POINT =
"isStartWithSavePoint";
+
+ public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";
+
+ public static final String JOB_STATUS = "jobStatus";
+
+ public static final String CREATE_TIME = "createTime";
+
+ public static final String ENV_OPTIONS = "envOptions";
+
+ public static final String JOB_DAG = "jobDag";
+
+ public static final String PLUGIN_JARS_URLS = "pluginJarsUrls";
+
+ public static final String JAR_PATH = "jarPath";
+
+ public static final String METRICS = "metrics";
public static final String RUNNING_JOBS_URL =
"/hazelcast/rest/maps/running-jobs";
public static final String RUNNING_JOB_URL =
"/hazelcast/rest/maps/running-job";
public static final String SUBMIT_JOB_URL =
"/hazelcast/rest/maps/submit-job";
public static final String SYSTEM_MONITORING_INFORMATION =
"/hazelcast/rest/maps/system-monitoring-information";
+
+ public static final String STOP_JOB_URL = "/hazelcast/rest/maps/stop-job";
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 4c1debd6f8..9addb8d8ec 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -203,8 +203,8 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
} catch (JsonProcessingException | NullPointerException e) {
return metricsMap;
}
- metricsMap.put("sourceReceivedCount", sourceReadCount);
- metricsMap.put("sinkWriteCount", sinkWriteCount);
+ metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
+ metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);
return metricsMap;
}
@@ -243,28 +243,33 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
JobStatus jobStatus =
getSeaTunnelServer().getCoordinatorService().getJobStatus(jobId);
jobInfoJson
- .add("jobId", String.valueOf(jobId))
- .add("jobName", logicalDag.getJobConfig().getName())
- .add("jobStatus", jobStatus.toString())
- .add("envOptions",
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
+ .add(RestConstant.JOB_ID, String.valueOf(jobId))
+ .add(RestConstant.JOB_NAME,
logicalDag.getJobConfig().getName())
+ .add(RestConstant.JOB_STATUS, jobStatus.toString())
.add(
- "createTime",
+ RestConstant.ENV_OPTIONS,
+
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
+ .add(
+ RestConstant.CREATE_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new
Date(jobImmutableInformation.getCreateTime())))
- .add("jobDag", logicalDag.getLogicalDagAsJson())
+ .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
.add(
- "pluginJarsUrls",
+ RestConstant.PLUGIN_JARS_URLS,
(JsonValue)
jobImmutableInformation.getPluginJarsUrls().stream()
.map(
url -> {
JsonObject jarUrl = new
JsonObject();
- jarUrl.add("jarPath",
url.toString());
+ jarUrl.add(
+
RestConstant.JAR_PATH, url.toString());
return jarUrl;
})
.collect(JsonArray::new,
JsonArray::add, JsonArray::add))
- .add("isStartWithSavePoint",
jobImmutableInformation.isStartWithSavePoint())
- .add("metrics",
JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+ .add(
+ RestConstant.IS_START_WITH_SAVE_POINT,
+ jobImmutableInformation.isStartWithSavePoint())
+ .add(RestConstant.METRICS,
JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
return jobInfoJson;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index e0edd93203..66a9131f65 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.rest;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -42,6 +43,7 @@ import java.util.Map;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.STOP_JOB_URL;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL;
public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostCommand> {
@@ -66,6 +68,8 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
try {
if (uri.startsWith(SUBMIT_JOB_URL)) {
handleSubmitJob(httpPostCommand, uri);
+ } else if (uri.startsWith(STOP_JOB_URL)) {
+ handleStopJob(httpPostCommand, uri);
} else {
original.handle(httpPostCommand);
}
@@ -89,26 +93,17 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
throws IllegalArgumentException {
Map<String, String> requestParams = new HashMap<>();
RestUtil.buildRequestParams(requestParams, uri);
- byte[] requestBody = httpPostCommand.getData();
- if (requestBody.length == 0) {
- throw new IllegalArgumentException("Request body is empty.");
- }
- JsonNode requestBodyJsonNode;
- try {
- requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
- } catch (IOException e) {
- throw new IllegalArgumentException("Invalid JSON format in request
body.");
- }
- Config config = RestUtil.buildConfig(requestBodyJsonNode);
+ Config config = RestUtil.buildConfig(requestHandle(httpPostCommand));
JobConfig jobConfig = new JobConfig();
- jobConfig.setName(requestParams.get("jobName"));
+ jobConfig.setName(requestParams.get(RestConstant.JOB_NAME));
JobImmutableInformationEnv jobImmutableInformationEnv =
new JobImmutableInformationEnv(
jobConfig,
config,
textCommandService.getNode(),
-
Boolean.parseBoolean(requestParams.get("isStartWithSavePoint")),
- Long.parseLong(requestParams.get("jobId")));
+ Boolean.parseBoolean(
+
requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)),
+
Long.parseLong(requestParams.get(RestConstant.JOB_ID)));
JobImmutableInformation jobImmutableInformation =
jobImmutableInformationEnv.build();
CoordinatorService coordinatorService =
getSeaTunnelServer().getCoordinatorService();
Data data =
@@ -125,11 +120,52 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
Long jobId = jobImmutableInformationEnv.getJobId();
this.prepareResponse(
httpPostCommand,
- new JsonObject().add("jobId", jobId).add("jobName",
requestParams.get("jobName")));
+ new JsonObject()
+ .add(RestConstant.JOB_ID, jobId)
+ .add(RestConstant.JOB_NAME,
requestParams.get(RestConstant.JOB_NAME)));
+ }
+
+ private void handleStopJob(HttpPostCommand httpPostCommand, String uri) {
+ Map<String, Object> map =
JsonUtils.toMap(requestHandle(httpPostCommand));
+ boolean isStopWithSavePoint = false;
+ if (map.get(RestConstant.JOB_ID) == null) {
+ throw new IllegalArgumentException("jobId cannot be empty.");
+ }
+ long jobId = Long.parseLong(map.get(RestConstant.JOB_ID).toString());
+ if (map.get(RestConstant.IS_STOP_WITH_SAVE_POINT) != null) {
+ isStopWithSavePoint =
+
Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
+ }
+
+ CoordinatorService coordinatorService =
getSeaTunnelServer().getCoordinatorService();
+
+ if (isStopWithSavePoint) {
+ coordinatorService.savePoint(jobId);
+ } else {
+ coordinatorService.cancelJob(jobId);
+ }
+
+ this.prepareResponse(
+ httpPostCommand,
+ new JsonObject().add(RestConstant.JOB_ID,
map.get(RestConstant.JOB_ID).toString()));
}
@Override
public void handleRejection(HttpPostCommand httpPostCommand) {
handle(httpPostCommand);
}
+
+ private JsonNode requestHandle(HttpPostCommand httpPostCommand) {
+ byte[] requestBody = httpPostCommand.getData();
+ if (requestBody.length == 0) {
+ throw new IllegalArgumentException("Request body is empty.");
+ }
+ JsonNode requestBodyJsonNode;
+ try {
+ requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Invalid JSON format in request
body.");
+ }
+ return requestBodyJsonNode;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
index d3761366d0..51c50d85a2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
import com.hazelcast.internal.util.StringUtil;
@@ -40,9 +41,9 @@ public class RestUtil {
}
public static void buildRequestParams(Map<String, String> requestParams,
String uri) {
- requestParams.put("jobId", null);
- requestParams.put("jobName", Constants.LOGO);
- requestParams.put("isStartWithSavePoint", String.valueOf(false));
+ requestParams.put(RestConstant.JOB_ID, null);
+ requestParams.put(RestConstant.JOB_NAME, Constants.LOGO);
+ requestParams.put(RestConstant.IS_START_WITH_SAVE_POINT,
String.valueOf(false));
uri = StringUtil.stripTrailingSlash(uri);
if (!uri.contains("?")) {
return;