This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new c0880eee [Improvement] [Seatunnel-web] Add API to get job execution
status. (#197)
c0880eee is described below
commit c0880eeefd1b3f2242c3d42b799ddd5f15c5ce47
Author: Mohammad Arshad <[email protected]>
AuthorDate: Thu Aug 29 07:33:46 2024 +0530
[Improvement] [Seatunnel-web] Add API to get job execution status. (#197)
---
.../app/controller/JobExecutorController.java | 20 ++++++++
.../seatunnel/app/dal/dao/IJobInstanceDao.java | 2 +
.../app/dal/dao/impl/JobInstanceDaoImpl.java | 5 ++
.../app/dal/mapper/JobInstanceMapper.java | 2 +
.../response/executor/JobExecutionStatus.java} | 25 +++++-----
.../app/service/ITaskInstanceService.java | 6 +++
.../app/service/impl/TaskInstanceServiceImpl.java | 56 ++++++++++++++++++++--
.../seatunnel/app/dal/mapper/JobInstanceMapper.xml | 5 ++
.../controller/JobExecutorControllerWrapper.java | 16 +++++++
.../app/test/JobExecutorControllerTest.java | 40 +++++++++++-----
10 files changed, 149 insertions(+), 28 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
index d330f375..44fe7d95 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
@@ -18,10 +18,13 @@
package org.apache.seatunnel.app.controller;
import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.app.service.IJobExecutorService;
import org.apache.seatunnel.app.service.IJobInstanceService;
+import org.apache.seatunnel.app.service.ITaskInstanceService;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
@@ -48,6 +51,7 @@ public class JobExecutorController {
@Resource IJobExecutorService jobExecutorService;
@Resource private IJobInstanceService jobInstanceService;
+ @Resource private ITaskInstanceService<SeaTunnelJobInstanceDto>
taskInstanceService;
@PostMapping("/execute")
@ApiOperation(value = "Execute synchronization tasks", httpMethod = "POST")
@@ -88,4 +92,20 @@ public class JobExecutorController {
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
return jobExecutorService.jobStore(userId, jobInstanceId);
}
+
+ @GetMapping("/status")
+ @ApiOperation(value = "get job execution status", httpMethod = "GET")
+ Result<JobExecutionStatus> getJobExecutionStatus(
+ @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
+ @ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
+ return taskInstanceService.getJobExecutionStatus(userId,
jobInstanceId);
+ }
+
+ @GetMapping("/detail")
+ @ApiOperation(value = "get job execution status and some more details",
httpMethod = "GET")
+ Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(
+ @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
+ @ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
+ return taskInstanceService.getJobExecutionDetail(userId,
jobInstanceId);
+ }
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
index f4c5b8f1..44026af8 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
@@ -47,4 +47,6 @@ public interface IJobInstanceDao {
String jobMode);
List<JobInstance> getAllJobInstance(@NonNull List<Long> jobInstanceIdList);
+
+ JobInstance getJobExecutionStatus(@NonNull Long jobInstanceId);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
index 4eff1e18..02b529bf 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
@@ -88,4 +88,9 @@ public class JobInstanceDaoImpl implements IJobInstanceDao {
return jobInstances;
}
+
+ @Override
+ public JobInstance getJobExecutionStatus(@NonNull Long jobInstanceId) {
+ return jobInstanceMapper.getJobExecutionStatus(jobInstanceId);
+ }
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
index 1ad784df..53b60a40 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
@@ -37,4 +37,6 @@ public interface JobInstanceMapper extends
BaseMapper<JobInstance> {
@Param("endTime") Date endTime,
@Param("jobDefineId") Long jobDefineId,
@Param("jobMode") String jobMode);
+
+ JobInstance getJobExecutionStatus(@Param("jobInstanceId") Long
jobInstanceId);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
similarity index 61%
copy from
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
copy to
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
index 77f2d115..833abd88 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
@@ -15,21 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.app.service;
+package org.apache.seatunnel.app.domain.response.executor;
-import org.apache.seatunnel.app.common.Result;
-import org.apache.seatunnel.app.utils.PageInfo;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-public interface ITaskInstanceService<T> {
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class JobExecutionStatus {
- Result<PageInfo<T>> getSyncTaskInstancePaging(
- Integer userId,
- String jobDefineName,
- String executorName,
- String stateType,
- String startTime,
- String endTime,
- String syncTaskType,
- Integer pageNo,
- Integer pageSize);
+ private String jobStatus;
+
+ private String errorMessage;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
index 77f2d115..ee74b244 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.app.service;
import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
import org.apache.seatunnel.app.utils.PageInfo;
public interface ITaskInstanceService<T> {
@@ -32,4 +34,8 @@ public interface ITaskInstanceService<T> {
String syncTaskType,
Integer pageNo,
Integer pageSize);
+
+ Result<JobExecutionStatus> getJobExecutionStatus(Integer userId, long
jobInstanceId);
+
+ Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(Integer userId, long
jobInstanceId);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
index aa89c85c..0f7a2266 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
@@ -22,7 +22,9 @@ import org.apache.seatunnel.app.common.Status;
import org.apache.seatunnel.app.dal.dao.IJobDefinitionDao;
import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
import org.apache.seatunnel.app.dal.entity.JobDefinition;
+import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
import org.apache.seatunnel.app.domain.response.metrics.JobSummaryMetricsRes;
import org.apache.seatunnel.app.service.BaseService;
import org.apache.seatunnel.app.service.IJobDefinitionService;
@@ -41,6 +43,7 @@ import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -103,15 +106,20 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
if (CollectionUtils.isEmpty(records)) {
return result;
}
- addJobDefineNameToResult(records);
- addRunningTimeToResult(records);
- jobPipelineSummaryMetrics(records, syncTaskType, userId);
+ populateExecutionMetricsData(userId, syncTaskType, records);
pageInfo.setTotal((int) jobInstanceIPage.getTotal());
pageInfo.setTotalList(records);
result.setData(pageInfo);
return result;
}
+ private void populateExecutionMetricsData(
+ Integer userId, String syncTaskType, List<SeaTunnelJobInstanceDto>
records) {
+ addJobDefineNameToResult(records);
+ addRunningTimeToResult(records);
+ jobPipelineSummaryMetrics(records, syncTaskType, userId);
+ }
+
private void addRunningTimeToResult(List<SeaTunnelJobInstanceDto> records)
{
for (SeaTunnelJobInstanceDto jobInstanceDto : records) {
long runningTime = 0l;
@@ -187,4 +195,46 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
}
}
+
+ @Override
+ public Result<JobExecutionStatus> getJobExecutionStatus(Integer userId,
long jobInstanceId) {
+ JobInstance jobInstance =
jobInstanceDao.getJobExecutionStatus(jobInstanceId);
+ if (jobInstance == null) {
+ return Result.failure(404, "Job instance not found");
+ }
+ return Result.success(
+ new JobExecutionStatus(jobInstance.getJobStatus(),
jobInstance.getErrorMessage()));
+ }
+
+ @Override
+ public Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(
+ Integer userId, long jobInstanceId) {
+ JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
+ if (jobInstance == null) {
+ return Result.failure(404, "Job instance not found");
+ }
+ SeaTunnelJobInstanceDto executionDetails = convertToDto(jobInstance);
+ populateExecutionMetricsData(
+ userId, jobInstance.getJobType(),
Collections.singletonList(executionDetails));
+ return Result.success(executionDetails);
+ }
+
+ private SeaTunnelJobInstanceDto convertToDto(JobInstance jobInstance) {
+ SeaTunnelJobInstanceDto dto = new SeaTunnelJobInstanceDto();
+ dto.setId(jobInstance.getId());
+ dto.setJobDefineId(jobInstance.getJobDefineId());
+ dto.setJobStatus(jobInstance.getJobStatus());
+ dto.setJobConfig(jobInstance.getJobConfig());
+ dto.setEngineName(jobInstance.getEngineName());
+ dto.setEngineVersion(jobInstance.getEngineVersion());
+ dto.setJobEngineId(jobInstance.getJobEngineId());
+ dto.setCreateUserId(jobInstance.getCreateUserId());
+ dto.setUpdateUserId(jobInstance.getUpdateUserId());
+ dto.setCreateTime(jobInstance.getCreateTime());
+ dto.setUpdateTime(jobInstance.getUpdateTime());
+ dto.setEndTime(jobInstance.getEndTime());
+ dto.setJobType(jobInstance.getJobType());
+ dto.setErrorMessage(jobInstance.getErrorMessage());
+ return dto;
+ }
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
index ee61b70c..55c44037 100644
---
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
+++
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
@@ -53,4 +53,9 @@
</where>
ORDER BY create_time DESC
</select>
+ <select id="getJobExecutionStatus"
resultType="org.apache.seatunnel.app.dal.entity.JobInstance">
+ SELECT `job_status`, `error_message`
+ FROM t_st_job_instance t
+ WHERE t.id = #{jobInstanceId}
+ </select>
</mapper>
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
index c7732548..f78fae34 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
@@ -18,7 +18,9 @@ package org.apache.seatunnel.app.controller;
import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.app.utils.JSONTestUtils;
import org.apache.seatunnel.app.utils.JSONUtils;
@@ -63,4 +65,18 @@ public class JobExecutorControllerWrapper extends
SeatunnelWebTestingBase {
sendRequest(urlWithParam("job/executor/restore?jobInstanceId="
+ jobInstanceId));
return JSONTestUtils.parseObject(response, Result.class);
}
+
+ public Result<JobExecutionStatus> getJobExecutionStatus(Long
jobInstanceId) {
+ String response =
+ sendRequest(urlWithParam("job/executor/status?jobInstanceId="
+ jobInstanceId));
+ return JSONTestUtils.parseObject(
+ response, new TypeReference<Result<JobExecutionStatus>>() {});
+ }
+
+ public Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(Long
jobInstanceId) {
+ String response =
+ sendRequest(urlWithParam("job/executor/detail?jobInstanceId="
+ jobInstanceId));
+ return JSONTestUtils.parseObject(
+ response, new TypeReference<Result<SeaTunnelJobInstanceDto>>()
{});
+ }
}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 8d20497b..50a4e419 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -21,15 +21,16 @@ import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
-import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -53,7 +54,6 @@ public class JobExecutorControllerTest {
private static final String uniqueId = "_" + System.currentTimeMillis();
private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper;
private static JobControllerWrapper jobControllerWrapper;
- private static TaskInstanceControllerWrapper taskInstanceControllerWrapper;
@BeforeAll
public static void setUp() {
@@ -61,7 +61,6 @@ public class JobExecutorControllerTest {
jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
seatunnelDatasourceControllerWrapper = new
SeatunnelDatasourceControllerWrapper();
jobControllerWrapper = new JobControllerWrapper();
- taskInstanceControllerWrapper = new TaskInstanceControllerWrapper();
}
@Test
@@ -277,10 +276,19 @@ public class JobExecutorControllerTest {
// Fails because of the wrong database credentials.
assertFalse(result.isSuccess());
// Even though job failed but job instance is created into the
database.
- assertTrue(result.getData() > 0);
- SeaTunnelJobInstanceDto taskInstanceList =
- taskInstanceControllerWrapper.getTaskInstanceList(jobName);
- assertNotNull(taskInstanceList.getErrorMessage());
+ Long jobInstanceId = result.getData();
+ assertTrue(jobInstanceId > 0);
+ Result<JobExecutionStatus> jobExecutionStatusResult =
+
jobExecutorControllerWrapper.getJobExecutionStatus(jobInstanceId);
+ assertTrue(jobExecutionStatusResult.isSuccess());
+ assertEquals(JobStatus.FAILED.name(),
jobExecutionStatusResult.getData().getJobStatus());
+ assertNotNull(jobExecutionStatusResult.getData().getErrorMessage());
+
+ // Invalid jobInstanceId
+ Result<JobExecutionStatus> jobExecutionStatusResult2 =
+ jobExecutorControllerWrapper.getJobExecutionStatus(123L);
+ assertFalse(jobExecutionStatusResult2.isSuccess());
+ assertEquals(404, jobExecutionStatusResult2.getCode());
}
@Test
@@ -291,12 +299,22 @@ public class JobExecutorControllerTest {
// job submitted successfully but it will fail during execution
assertTrue(result.isSuccess());
assertTrue(result.getData() > 0);
- JobUtils.waitForJobCompletion(result.getData());
+ Long jobInstanceId = result.getData();
+ JobUtils.waitForJobCompletion(jobInstanceId);
// extra second to let the data get updated in the database
Thread.sleep(2000);
- SeaTunnelJobInstanceDto taskInstanceList =
- taskInstanceControllerWrapper.getTaskInstanceList(jobName);
- assertNotNull(taskInstanceList.getErrorMessage());
+ Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult =
+
jobExecutorControllerWrapper.getJobExecutionDetail(jobInstanceId);
+ assertTrue(jobExecutionDetailResult.isSuccess());
+ assertEquals(JobStatus.FAILED.name(),
jobExecutionDetailResult.getData().getJobStatus());
+ assertNotNull(jobExecutionDetailResult.getData().getErrorMessage());
+ assertNotNull(jobExecutionDetailResult.getData().getJobDefineName());
+
+ // Invalid jobInstanceId
+ Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult2 =
+ jobExecutorControllerWrapper.getJobExecutionDetail(123L);
+ assertFalse(jobExecutionDetailResult2.isSuccess());
+ assertEquals(404, jobExecutionDetailResult2.getCode());
}
@AfterAll