This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 809870a60f [Improve][RestAPI] return finished job info when job is 
finished (#6576)
809870a60f is described below

commit 809870a60ff0d852468e795b61c4dfd281bb7551
Author: Jarvis <[email protected]>
AuthorDate: Wed Mar 27 10:23:24 2024 +0800

    [Improve][RestAPI] return finished job info when job is finished (#6576)
---
 docs/en/seatunnel-engine/rest-api.md               | 18 ++++---
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 36 ++++++++++++++
 .../server/rest/RestHttpGetCommandProcessor.java   | 56 ++++++++++++++++++++++
 3 files changed, 104 insertions(+), 6 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api.md 
b/docs/en/seatunnel-engine/rest-api.md
index 4a56c7da7e..3f1069b566 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -92,8 +92,6 @@ network:
   "jobId": "",
   "jobName": "",
   "jobStatus": "",
-  "envOptions": {
-  },
   "createTime": "",
   "jobDag": {
     "vertices": [
@@ -101,16 +99,24 @@ network:
     "edges": [
     ]
   },
-  "pluginJarsUrls": [
-  ],
-  "isStartWithSavePoint": false,
   "metrics": {
     "sourceReceivedCount": "",
     "sinkWriteCount": ""
-  }
+  },
+  "finishedTime": "",
+  "errorMsg": null,
+  "envOptions": {
+  },
+  "pluginJarsUrls": [
+  ],
+  "isStartWithSavePoint": false
 }
 ```
 
+`jobId`, `jobName`, `jobStatus`, `createTime`, `jobDag`, `metrics` always be 
returned.
+`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is 
running.
+`finishedTime`, `errorMsg` will return when job is finished.
+
 When we can't get the job info, the response will be:
 
 ```json
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 5427a8e1c2..c7be274ad2 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -52,6 +52,8 @@ public class RestApiIT {
 
     private static ClientJobProxy clientJobProxy;
 
+    private static ClientJobProxy batchJobProxy;
+
     private static HazelcastInstanceImpl node1;
 
     private static HazelcastInstanceImpl node2;
@@ -85,6 +87,19 @@ public class RestApiIT {
                         () ->
                                 Assertions.assertEquals(
                                         JobStatus.RUNNING, 
clientJobProxy.getJobStatus()));
+
+        String batchFilePath = 
TestUtils.getResource("fakesource_to_console.conf");
+        JobConfig batchConf = new JobConfig();
+        batchConf.setName("fake_to_console");
+        ClientJobExecutionEnvironment batchJobExecutionEnv =
+                engineClient.createExecutionContext(batchFilePath, batchConf, 
seaTunnelConfig);
+        batchJobProxy = batchJobExecutionEnv.execute();
+        Awaitility.await()
+                .atMost(5, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FINISHED, 
batchJobProxy.getJobStatus()));
     }
 
     @Test
@@ -108,6 +123,27 @@ public class RestApiIT {
                         });
     }
 
+    @Test
+    public void testGetJobById() {
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            given().get(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + 
RestConstant.RUNNING_JOB_URL
+                                                    + "/"
+                                                    + batchJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", 
equalTo("fake_to_console"))
+                                    .body("jobStatus", equalTo("FINISHED"));
+                        });
+    }
+
     @Test
     public void testGetAnNotExistJobById() {
         Arrays.asList(node2, node1)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 81a1047c74..79f29575a1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -272,8 +272,36 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                                 .getMap(Constant.IMAP_RUNNING_JOB_INFO)
                                 .get(Long.valueOf(jobId));
 
+        JobState finishedJobState =
+                (JobState)
+                        this.textCommandService
+                                .getNode()
+                                .getNodeEngine()
+                                .getHazelcastInstance()
+                                .getMap(Constant.IMAP_FINISHED_JOB_STATE)
+                                .get(Long.valueOf(jobId));
         if (!jobId.isEmpty() && jobInfo != null) {
             this.prepareResponse(command, convertToJson(jobInfo, 
Long.parseLong(jobId)));
+        } else if (!jobId.isEmpty() && finishedJobState != null) {
+            JobMetrics finishedJobMetrics =
+                    (JobMetrics)
+                            this.textCommandService
+                                    .getNode()
+                                    .getNodeEngine()
+                                    .getHazelcastInstance()
+                                    .getMap(Constant.IMAP_FINISHED_JOB_METRICS)
+                                    .get(Long.valueOf(jobId));
+            JobDAGInfo finishedJobDAGInfo =
+                    (JobDAGInfo)
+                            this.textCommandService
+                                    .getNode()
+                                    .getNodeEngine()
+                                    .getHazelcastInstance()
+                                    
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO)
+                                    .get(Long.valueOf(jobId));
+            this.prepareResponse(
+                    command,
+                    convertToJson(finishedJobState, finishedJobMetrics, 
finishedJobDAGInfo));
         } else {
             this.prepareResponse(command, new 
JsonObject().add(RestConstant.JOB_ID, jobId));
         }
@@ -411,6 +439,34 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
         return jobInfoJson;
     }
 
+    private JsonObject convertToJson(
+            JobState finishedJobState,
+            JobMetrics finishedJobMetrics,
+            JobDAGInfo finishedJobDAGInfo) {
+        JsonObject jobInfoJson = new JsonObject();
+        jobInfoJson
+                .add(RestConstant.JOB_ID, 
String.valueOf(finishedJobState.getJobId()))
+                .add(RestConstant.JOB_NAME, finishedJobState.getJobName())
+                .add(RestConstant.JOB_STATUS, 
finishedJobState.getJobStatus().toString())
+                .add(RestConstant.ERROR_MSG, 
finishedJobState.getErrorMessage())
+                .add(
+                        RestConstant.CREATE_TIME,
+                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                                .format(new 
Date(finishedJobState.getSubmitTime())))
+                .add(
+                        RestConstant.FINISH_TIME,
+                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                                .format(new 
Date(finishedJobState.getFinishTime())))
+                .add(
+                        RestConstant.JOB_DAG,
+                        
Json.parse(JsonUtils.toJsonString(finishedJobDAGInfo)).asObject())
+                .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
+                .add(
+                        RestConstant.METRICS,
+                        
JsonUtil.toJsonObject(getJobMetrics(finishedJobMetrics.toJsonString())));
+        return jobInfoJson;
+    }
+
     private JsonObject convertToJson(
             JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long 
jobId) {
         JsonObject jobInfoJson = new JsonObject();

Reply via email to