This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 809870a60f [Improve][RestAPI] return finished job info when job is
finished (#6576)
809870a60f is described below
commit 809870a60ff0d852468e795b61c4dfd281bb7551
Author: Jarvis <[email protected]>
AuthorDate: Wed Mar 27 10:23:24 2024 +0800
[Improve][RestAPI] return finished job info when job is finished (#6576)
---
docs/en/seatunnel-engine/rest-api.md | 18 ++++---
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 36 ++++++++++++++
.../server/rest/RestHttpGetCommandProcessor.java | 56 ++++++++++++++++++++++
3 files changed, 104 insertions(+), 6 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api.md
b/docs/en/seatunnel-engine/rest-api.md
index 4a56c7da7e..3f1069b566 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -92,8 +92,6 @@ network:
"jobId": "",
"jobName": "",
"jobStatus": "",
- "envOptions": {
- },
"createTime": "",
"jobDag": {
"vertices": [
@@ -101,16 +99,24 @@ network:
"edges": [
]
},
- "pluginJarsUrls": [
- ],
- "isStartWithSavePoint": false,
"metrics": {
"sourceReceivedCount": "",
"sinkWriteCount": ""
- }
+ },
+ "finishedTime": "",
+ "errorMsg": null,
+ "envOptions": {
+ },
+ "pluginJarsUrls": [
+ ],
+ "isStartWithSavePoint": false
}
```
+`jobId`, `jobName`, `jobStatus`, `createTime`, `jobDag`, `metrics` always be
returned.
+`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is
running.
+`finishedTime`, `errorMsg` will return when job is finished.
+
When we can't get the job info, the response will be:
```json
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 5427a8e1c2..c7be274ad2 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -52,6 +52,8 @@ public class RestApiIT {
private static ClientJobProxy clientJobProxy;
+ private static ClientJobProxy batchJobProxy;
+
private static HazelcastInstanceImpl node1;
private static HazelcastInstanceImpl node2;
@@ -85,6 +87,19 @@ public class RestApiIT {
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
clientJobProxy.getJobStatus()));
+
+ String batchFilePath =
TestUtils.getResource("fakesource_to_console.conf");
+ JobConfig batchConf = new JobConfig();
+ batchConf.setName("fake_to_console");
+ ClientJobExecutionEnvironment batchJobExecutionEnv =
+ engineClient.createExecutionContext(batchFilePath, batchConf,
seaTunnelConfig);
+ batchJobProxy = batchJobExecutionEnv.execute();
+ Awaitility.await()
+ .atMost(5, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
batchJobProxy.getJobStatus()));
}
@Test
@@ -108,6 +123,27 @@ public class RestApiIT {
});
}
+ @Test
+ public void testGetJobById() {
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ given().get(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOB_URL
+ + "/"
+ + batchJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName",
equalTo("fake_to_console"))
+ .body("jobStatus", equalTo("FINISHED"));
+ });
+ }
+
@Test
public void testGetAnNotExistJobById() {
Arrays.asList(node2, node1)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 81a1047c74..79f29575a1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -272,8 +272,36 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
.getMap(Constant.IMAP_RUNNING_JOB_INFO)
.get(Long.valueOf(jobId));
+ JobState finishedJobState =
+ (JobState)
+ this.textCommandService
+ .getNode()
+ .getNodeEngine()
+ .getHazelcastInstance()
+ .getMap(Constant.IMAP_FINISHED_JOB_STATE)
+ .get(Long.valueOf(jobId));
if (!jobId.isEmpty() && jobInfo != null) {
this.prepareResponse(command, convertToJson(jobInfo,
Long.parseLong(jobId)));
+ } else if (!jobId.isEmpty() && finishedJobState != null) {
+ JobMetrics finishedJobMetrics =
+ (JobMetrics)
+ this.textCommandService
+ .getNode()
+ .getNodeEngine()
+ .getHazelcastInstance()
+ .getMap(Constant.IMAP_FINISHED_JOB_METRICS)
+ .get(Long.valueOf(jobId));
+ JobDAGInfo finishedJobDAGInfo =
+ (JobDAGInfo)
+ this.textCommandService
+ .getNode()
+ .getNodeEngine()
+ .getHazelcastInstance()
+
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO)
+ .get(Long.valueOf(jobId));
+ this.prepareResponse(
+ command,
+ convertToJson(finishedJobState, finishedJobMetrics,
finishedJobDAGInfo));
} else {
this.prepareResponse(command, new
JsonObject().add(RestConstant.JOB_ID, jobId));
}
@@ -411,6 +439,34 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
return jobInfoJson;
}
+ private JsonObject convertToJson(
+ JobState finishedJobState,
+ JobMetrics finishedJobMetrics,
+ JobDAGInfo finishedJobDAGInfo) {
+ JsonObject jobInfoJson = new JsonObject();
+ jobInfoJson
+ .add(RestConstant.JOB_ID,
String.valueOf(finishedJobState.getJobId()))
+ .add(RestConstant.JOB_NAME, finishedJobState.getJobName())
+ .add(RestConstant.JOB_STATUS,
finishedJobState.getJobStatus().toString())
+ .add(RestConstant.ERROR_MSG,
finishedJobState.getErrorMessage())
+ .add(
+ RestConstant.CREATE_TIME,
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(new
Date(finishedJobState.getSubmitTime())))
+ .add(
+ RestConstant.FINISH_TIME,
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(new
Date(finishedJobState.getFinishTime())))
+ .add(
+ RestConstant.JOB_DAG,
+
Json.parse(JsonUtils.toJsonString(finishedJobDAGInfo)).asObject())
+ .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
+ .add(
+ RestConstant.METRICS,
+
JsonUtil.toJsonObject(getJobMetrics(finishedJobMetrics.toJsonString())));
+ return jobInfoJson;
+ }
+
private JsonObject convertToJson(
JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long
jobId) {
JsonObject jobInfoJson = new JsonObject();