This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a1e24d5 [Improvement] [Seatunnel-web] Add support to provide reason 
for job failure (#196)
0a1e24d5 is described below

commit 0a1e24d52004139a2453fc7f7abff5b83982d965
Author: Mohammad Arshad <[email protected]>
AuthorDate: Wed Aug 28 11:23:17 2024 +0530

    [Improvement] [Seatunnel-web] Add support to provide reason for job failure 
(#196)
---
 README.md                                          |   6 ++
 .../seatunnel/app/dal/entity/JobInstance.java      |   3 +
 .../seatunnel/app/service/IJobInstanceService.java |   6 +-
 .../app/service/impl/JobExecutorServiceImpl.java   |  18 ++--
 .../app/service/impl/JobInstanceServiceImpl.java   |  33 ++-----
 .../seatunnel/app/utils/JobExecParamUtil.java      |  12 +++
 .../seatunnel/app/dal/mapper/JobInstanceMapper.xml |   3 +-
 .../main/resources/script/seatunnel_server_h2.sql  |   1 +
 .../resources/script/seatunnel_server_mysql.sql    |   1 +
 .../app/controller/JobTaskControllerWrapper.java   |  16 +++-
 .../controller/TaskInstanceControllerWrapper.java  | 100 +++++++++++++++++++++
 .../app/test/JobExecutorControllerTest.java        |  23 +++++
 .../app/test/TaskInstanceControllerTest.java       |  73 +++++++++++++++
 .../org/apache/seatunnel/app/utils/JobUtils.java   |  19 +++-
 seatunnel-web-it/src/test/resources/hazelcast.yaml |  10 +--
 15 files changed, 277 insertions(+), 47 deletions(-)

diff --git a/README.md b/README.md
index 0eae9fc6..8f516523 100644
--- a/README.md
+++ b/README.md
@@ -206,3 +206,9 @@ Now ,let me show you how to use it.
 
 #### Virtual Tables manage
 ![img.png](docs/images/VirtualImage.png)
+
+### Upgrades
+#### 1. Upgrade from 1.0.1 or before to 1.0.2 or after.
+Execute the following SQL to upgrade the database:
+
+```ALTER TABLE `t_st_job_instance` ADD COLUMN `error_message` varchar(4096) 
CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL;```
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
index edcb2f7e..db5cbeba 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
@@ -73,4 +73,7 @@ public class JobInstance {
 
     @TableField("job_type")
     private String jobType;
+
+    @TableField("error_message")
+    private String errorMessage;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
index e6db1d84..e971452e 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.app.dal.entity.JobLine;
 import org.apache.seatunnel.app.dal.entity.JobTask;
 import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
+import org.apache.seatunnel.engine.core.job.JobResult;
 
 import lombok.NonNull;
 
@@ -40,5 +41,8 @@ public interface IJobInstanceService {
     JobExecutorRes getExecuteResource(@NonNull Long jobEngineId);
 
     void complete(
-            @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull 
String jobEngineId);
+            @NonNull Integer userId,
+            @NonNull Long jobInstanceId,
+            @NonNull String jobEngineId,
+            JobResult jobResult);
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index b8ca731e..e7490347 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.app.service.IJobInstanceService;
 import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
 import 
org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
 import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
+import org.apache.seatunnel.app.utils.JobExecParamUtil;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
@@ -37,6 +38,7 @@ import 
org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
 
@@ -128,6 +130,9 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
             JobInstance jobInstance = 
jobInstanceDao.getJobInstance(jobInstanceId);
             jobInstance.setJobStatus(JobStatus.FAILED.name());
             jobInstance.setEndTime(new Date());
+            String jobInstanceErrorMessage =
+                    
JobExecParamUtil.getJobInstanceErrorMessage(e.getMessage());
+            jobInstance.setErrorMessage(jobInstanceErrorMessage);
             jobInstanceDao.update(jobInstance);
             throw new RuntimeException(e.getMessage(), e);
         }
@@ -152,21 +157,22 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
             String jobEngineId,
             SeaTunnelClient seaTunnelClient) {
         ExecutorService executor = Executors.newFixedThreadPool(1);
-        CompletableFuture<JobStatus> future =
-                
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete, executor);
+        CompletableFuture<JobResult> future =
+                
CompletableFuture.supplyAsync(clientJobProxy::waitForJobCompleteV2, executor);
+        JobResult jobResult = new JobResult(JobStatus.FAILED, "");
         try {
-            log.info("future.get before");
-            JobStatus jobStatus = future.get();
-
+            jobResult = future.get();
             executor.shutdown();
         } catch (InterruptedException e) {
+            jobResult.setError(e.getMessage());
             throw new RuntimeException(e);
         } catch (ExecutionException e) {
+            jobResult.setError(e.getMessage());
             throw new RuntimeException(e);
         } finally {
             seaTunnelClient.close();
             log.info("and jobInstanceService.complete begin");
-            jobInstanceService.complete(userId, jobInstanceId, jobEngineId);
+            jobInstanceService.complete(userId, jobInstanceId, jobEngineId, 
jobResult);
         }
     }
 
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index 7a05da7e..68f7ee00 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -52,7 +52,6 @@ import 
org.apache.seatunnel.app.domain.request.job.transform.Transform;
 import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions;
 import 
org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
-import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
 import 
org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
 import org.apache.seatunnel.app.service.IDatasourceService;
 import org.apache.seatunnel.app.service.IJobInstanceService;
@@ -64,7 +63,7 @@ import org.apache.seatunnel.app.utils.JobExecParamUtil;
 import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.server.common.CodeGenerateUtils;
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
 import org.apache.seatunnel.server.common.SeatunnelException;
@@ -361,34 +360,18 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
 
     @Override
     public void complete(
-            @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull 
String jobEngineId) {
+            @NonNull Integer userId,
+            @NonNull Long jobInstanceId,
+            @NonNull String jobEngineId,
+            JobResult jobResult) {
         
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, 
userId);
         JobInstance jobInstance = 
jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
         jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);
-
-        List<JobPipelineSummaryMetricsRes> status =
-                jobMetricsService.getJobPipelineSummaryMetrics(userId, 
jobInstanceId);
-
-        String jobStatus;
-        Set<String> statusList =
-                status.stream()
-                        .map(JobPipelineSummaryMetricsRes::getStatus)
-                        .map(String::toUpperCase)
-                        .collect(Collectors.toSet());
-        if (statusList.size() == 1 && statusList.contains("FINISHED")) {
-            jobStatus = JobStatus.FINISHED.name();
-        } else if (statusList.contains("FAILED")) {
-            jobStatus = JobStatus.FAILED.name();
-        } else if (statusList.contains("CANCELED")) {
-            jobStatus = JobStatus.CANCELED.name();
-        } else if (statusList.contains("CANCELLING")) {
-            jobStatus = JobStatus.CANCELING.name();
-        } else {
-            jobStatus = JobStatus.RUNNING.name();
-        }
-        jobInstance.setJobStatus(jobStatus);
+        jobInstance.setJobStatus(jobResult.getStatus().name());
         jobInstance.setJobEngineId(jobEngineId);
         jobInstance.setUpdateUserId(userId);
+        jobInstance.setErrorMessage(
+                
JobExecParamUtil.getJobInstanceErrorMessage(jobResult.getError()));
         jobInstanceDao.update(jobInstance);
     }
 
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
index 5e236849..497524df 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
@@ -27,6 +27,18 @@ import java.util.Map;
 
 public class JobExecParamUtil {
 
+    // The maximum length of the job execution error message, 4KB
+    private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;
+
+    public static String getJobInstanceErrorMessage(String message) {
+        if (message == null) {
+            return null;
+        }
+        return message.length() > ERROR_MESSAGE_MAX_LENGTH
+                ? message.substring(0, ERROR_MESSAGE_MAX_LENGTH)
+                : message;
+    }
+
     public static Config updateEnvConfig(JobExecParam jobExecParam, Config 
envConfig) {
         if (jobExecParam == null || jobExecParam.getEnv() == null) {
             return envConfig;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
 
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
index 60c2ae11..ee61b70c 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
+++ 
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
@@ -24,10 +24,11 @@
         <result column="engine_name" jdbcType="VARCHAR" property="engineName"/>
         <result column="engine_version" jdbcType="VARCHAR" 
property="engineVersion"/>
         <result column="job_engine_id" jdbcType="VARCHAR" 
property="jobEngineId"/>
+        <result column="error_message" jdbcType="VARCHAR" 
property="errorMessage"/>
     </resultMap>
     <sql id="Base_Column_List">
         id
-        , `job_define_id`, `job_status`, `job_config`, `engine_name`, 
`engine_version`, `job_engine_id`
+        , `job_define_id`, `job_status`, `job_config`, `engine_name`, 
`engine_version`, `job_engine_id`,`error_message`
     </sql>
 
     <select id="queryJobInstanceListPaging" 
resultType="org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto">
diff --git 
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_h2.sql
 
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_h2.sql
index d74a0bdc..c85527f2 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_h2.sql
+++ 
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_h2.sql
@@ -99,6 +99,7 @@ CREATE TABLE t_st_job_instance (
                                    update_time TIMESTAMP(3) NOT NULL DEFAULT 
CURRENT_TIMESTAMP(3),
                                    end_time TIMESTAMP(3) DEFAULT NULL,
                                    job_type VARCHAR(50) NOT NULL,
+                                   error_message VARCHAR(4096) DEFAULT NULL,
                                    PRIMARY KEY (id)
 );
 
diff --git 
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
 
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
index 4f6663d6..2fd44da2 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
+++ 
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
@@ -107,6 +107,7 @@ CREATE TABLE `t_st_job_instance`  (
   `update_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE 
CURRENT_TIMESTAMP(3),
   `end_time` timestamp(3) NULL DEFAULT NULL,
   `job_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
+  `error_message` varchar(4096) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL 
DEFAULT NULL,
   PRIMARY KEY (`id`) USING BTREE
 ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = 
Dynamic;
 
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
index a89581ca..28cb6ba9 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
@@ -71,7 +71,7 @@ public class JobTaskControllerWrapper extends 
SeatunnelWebTestingBase {
         return JSONTestUtils.parseObject(response, Result.class);
     }
 
-    public String createFakeSourcePlugin(String datasourceId, long 
jobVersionId) {
+    public String createFakeSourcePlugin(String datasourceId, long 
jobVersionId, String rows) {
         DataSourceOption tableOption = new DataSourceOption();
         tableOption.setDatabases(Arrays.asList("fake_database"));
         tableOption.setTables(Arrays.asList("fake_table"));
@@ -88,7 +88,9 @@ public class JobTaskControllerWrapper extends 
SeatunnelWebTestingBase {
                         .dataSourceId(Long.parseLong(datasourceId))
                         .sceneMode(SceneMode.SINGLE_TABLE)
                         .config(
-                                
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n        name = 
\\\"string\\\"\\n        age = \\\"int\\\"\\n      
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.templat
 [...]
+                                
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n        name = 
\\\"string\\\"\\n        age = \\\"int\\\"\\n      
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.templat
 [...]
+                                        + rows
+                                        + 
"\",\"row.num\":5,\"split.num\":1,\"split.read-interval\":1,\"map.size\":5,\"array.size\":5,\"bytes.length\":5,\"date.year.template\":\"\",\"date.month.template\":\"\",\"date.day.template\":\"\",\"time.hour.template\":\"\",\"time.minute.template\":\"\",\"time.second.template\":\"\",\"parallelism\":1}")
                         .build();
 
         Result<Void> srcResult = saveSingleTask(jobVersionId, 
sourcePluginConfig);
@@ -96,6 +98,16 @@ public class JobTaskControllerWrapper extends 
SeatunnelWebTestingBase {
         return sourcePluginId;
     }
 
+    public String createFakeSourcePlugin(String datasourceId, long 
jobVersionId) {
+        return createFakeSourcePlugin(datasourceId, jobVersionId, "");
+    }
+
+    public String createFakeSourcePluginThatFails(String datasourceId, long 
jobVersionId) {
+        String rows =
+                "[{kind=INSERT, fields=[\"org\", 100]}, {kind=INSERT, 
fields=[\"apache\", 50]}, {kind=INSERT, fields=[\"seatunnel\", 25]}, 
{kind=INSERT, fields=[\"seatunnel-web\", 12]}, {kind=INSERT, fields=[\"etl\", 
6_age_invalid_number]}]";
+        return createFakeSourcePlugin(datasourceId, jobVersionId, rows);
+    }
+
     public String createConsoleSinkPlugin(String datasourceId, long 
jobVersionId) {
         DataSourceOption sinkTableOption = new DataSourceOption();
         sinkTableOption.setDatabases(Arrays.asList("console_fake_database"));
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
new file mode 100644
index 00000000..41cece13
--- /dev/null
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.PageInfo;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TaskInstanceControllerWrapper extends SeatunnelWebTestingBase {
+
+    private static SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(
+            String jobDefineName,
+            String executorName,
+            String stateType,
+            String startTime,
+            String endTime,
+            String syncTaskType,
+            Integer pageNo,
+            Integer pageSize) {
+        String response =
+                sendRequest(
+                        urlWithParam(
+                                "task/jobMetrics?jobDefineName="
+                                        + jobDefineName
+                                        + "&executorName="
+                                        + executorName
+                                        + "&stateType="
+                                        + stateType
+                                        + "&startDate="
+                                        + startTime
+                                        + "&endDate="
+                                        + endTime
+                                        + "&syncTaskType="
+                                        + syncTaskType
+                                        + "&pageNo="
+                                        + pageNo
+                                        + "&pageSize="
+                                        + pageSize));
+        return JSONTestUtils.parseObject(
+                response, new 
TypeReference<Result<PageInfo<SeaTunnelJobInstanceDto>>>() {});
+    }
+
+    public SeaTunnelJobInstanceDto getTaskInstanceList(String jobDefineName) {
+        String startTime =
+                URLEncoder.encode(
+                        dateFormat.format(
+                                new Date(System.currentTimeMillis() - 1000 * 
60 * 60 * 24)));
+        String endTime =
+                URLEncoder.encode(
+                        dateFormat.format(
+                                new Date(System.currentTimeMillis() + 1000 * 
60 * 60 * 24)));
+        String syncTaskType = "BATCH";
+        Integer pageNo = 1;
+        Integer pageSize = 10;
+        Result<PageInfo<SeaTunnelJobInstanceDto>> result =
+                getTaskInstanceList(
+                        jobDefineName,
+                        null,
+                        null,
+                        startTime,
+                        endTime,
+                        syncTaskType,
+                        pageNo,
+                        pageSize);
+        assertTrue(result.isSuccess());
+        if (result.getData().getTotalList().isEmpty()) {
+            return null;
+        }
+        assertEquals(1, result.getData().getTotalList().size());
+        return result.getData().getTotalList().get(0);
+    }
+}
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 3af81eb0..8d20497b 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
 import org.apache.seatunnel.app.controller.JobControllerWrapper;
 import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
 import 
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
 import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
 import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
 import org.apache.seatunnel.app.domain.request.job.JobExecParam;
@@ -51,6 +53,7 @@ public class JobExecutorControllerTest {
     private static final String uniqueId = "_" + System.currentTimeMillis();
     private static SeatunnelDatasourceControllerWrapper 
seatunnelDatasourceControllerWrapper;
     private static JobControllerWrapper jobControllerWrapper;
+    private static TaskInstanceControllerWrapper taskInstanceControllerWrapper;
 
     @BeforeAll
     public static void setUp() {
@@ -58,6 +61,7 @@ public class JobExecutorControllerTest {
         jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
         seatunnelDatasourceControllerWrapper = new 
SeatunnelDatasourceControllerWrapper();
         jobControllerWrapper = new JobControllerWrapper();
+        taskInstanceControllerWrapper = new TaskInstanceControllerWrapper();
     }
 
     @Test
@@ -274,6 +278,25 @@ public class JobExecutorControllerTest {
         assertFalse(result.isSuccess());
         // Even though job failed but job instance is created into the 
database.
         assertTrue(result.getData() > 0);
+        SeaTunnelJobInstanceDto taskInstanceList =
+                taskInstanceControllerWrapper.getTaskInstanceList(jobName);
+        assertNotNull(taskInstanceList.getErrorMessage());
+    }
+
+    @Test
+    public void storeErrorMessageWhenJobFailed() throws InterruptedException {
+        String jobName = "failureCause" + uniqueId;
+        long jobVersionId = JobUtils.createJob(jobName, true);
+        Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+        // job submitted successfully but it will fail during execution
+        assertTrue(result.isSuccess());
+        assertTrue(result.getData() > 0);
+        JobUtils.waitForJobCompletion(result.getData());
+        // extra second to let the data get updated in the database
+        Thread.sleep(2000);
+        SeaTunnelJobInstanceDto taskInstanceList =
+                taskInstanceControllerWrapper.getTaskInstanceList(jobName);
+        assertNotNull(taskInstanceList.getErrorMessage());
     }
 
     @AfterAll
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
new file mode 100644
index 00000000..08d4aacd
--- /dev/null
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
+import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.utils.JobUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TaskInstanceControllerTest extends SeatunnelWebTestingBase {
+
+    private static final SeaTunnelWebCluster seaTunnelWebCluster = new 
SeaTunnelWebCluster();
+    private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
+    private static TaskInstanceControllerWrapper taskInstanceControllerWrapper;
+    private static final String uniqueId = "_" + System.currentTimeMillis();
+
+    @BeforeAll
+    public static void setUp() {
+        seaTunnelWebCluster.start();
+        jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
+        taskInstanceControllerWrapper = new TaskInstanceControllerWrapper();
+    }
+
+    @Test
+    public void getTaskInstanceList_shouldReturnData_whenValidRequest() {
+        String jobName = "getTaskInstance" + uniqueId;
+        long jobVersionId = JobUtils.createJob(jobName);
+        Result<Long> execuitonResult = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+        assertTrue(execuitonResult.isSuccess());
+        Result<List<JobPipelineDetailMetricsRes>> listResult =
+                JobUtils.waitForJobCompletion(execuitonResult.getData());
+        assertEquals(1, listResult.getData().size());
+        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+
+        SeaTunnelJobInstanceDto taskInstanceList =
+                taskInstanceControllerWrapper.getTaskInstanceList(jobName);
+        assertNotNull(taskInstanceList);
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        seaTunnelWebCluster.stop();
+    }
+}
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
index a9214baf..2cce6317 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -109,6 +109,10 @@ public class JobUtils {
     }
 
     public static Long createJob(String jobName) {
+        return createJob(jobName, false);
+    }
+
+    public static Long createJob(String jobName, boolean shouldExecutionFail) {
         Long jobId = 
jobDefinitionControllerWrapper.createJobDefinition(jobName);
         JobConfig jobConfig = 
jobConfigControllerWrapper.populateJobConfigObject(jobName);
         // jobVersionId is same as jobId
@@ -124,9 +128,18 @@ public class JobUtils {
         String consoleDatasourceId =
                 
seatunnelDatasourceControllerWrapper.createConsoleDatasource("console_" + 
jobName);
 
-        String sourcePluginId =
-                jobTaskControllerWrapper.createFakeSourcePlugin(
-                        fakeSourceDatasourceId, jobVersionId);
+        String sourcePluginId;
+        if (shouldExecutionFail) {
+            sourcePluginId =
+                    jobTaskControllerWrapper.createFakeSourcePluginThatFails(
+                            fakeSourceDatasourceId, jobVersionId);
+
+        } else {
+            sourcePluginId =
+                    jobTaskControllerWrapper.createFakeSourcePlugin(
+                            fakeSourceDatasourceId, jobVersionId);
+        }
+
         String transPluginId = 
jobTaskControllerWrapper.createReplaceTransformPlugin(jobVersionId);
         String sinkPluginId =
                 
jobTaskControllerWrapper.createConsoleSinkPlugin(consoleDatasourceId, 
jobVersionId);
diff --git a/seatunnel-web-it/src/test/resources/hazelcast.yaml 
b/seatunnel-web-it/src/test/resources/hazelcast.yaml
index 7ac730e0..48ed2641 100644
--- a/seatunnel-web-it/src/test/resources/hazelcast.yaml
+++ b/seatunnel-web-it/src/test/resources/hazelcast.yaml
@@ -35,13 +35,5 @@ hazelcast:
       port: 5901
   properties:
     hazelcast.invocation.max.retry.count: 20
-    hazelcast.tcp.join.port.try.count: 30
-    hazelcast.logging.type: log4j2
-    hazelcast.operation.generic.thread.count: 50
-    hazelcast.heartbeat.failuredetector.type: phi-accrual
-    hazelcast.heartbeat.interval.seconds: 2
-    hazelcast.max.no.heartbeat.seconds: 180
-    hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
-    hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
-    hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
+
 

Reply via email to