This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 0a1e24d5 [Improvement] [Seatunnel-web] Add support to provide reason
for job failure (#196)
0a1e24d5 is described below
commit 0a1e24d52004139a2453fc7f7abff5b83982d965
Author: Mohammad Arshad <[email protected]>
AuthorDate: Wed Aug 28 11:23:17 2024 +0530
[Improvement] [Seatunnel-web] Add support to provide reason for job failure
(#196)
---
README.md | 6 ++
.../seatunnel/app/dal/entity/JobInstance.java | 3 +
.../seatunnel/app/service/IJobInstanceService.java | 6 +-
.../app/service/impl/JobExecutorServiceImpl.java | 18 ++--
.../app/service/impl/JobInstanceServiceImpl.java | 33 ++-----
.../seatunnel/app/utils/JobExecParamUtil.java | 12 +++
.../seatunnel/app/dal/mapper/JobInstanceMapper.xml | 3 +-
.../main/resources/script/seatunnel_server_h2.sql | 1 +
.../resources/script/seatunnel_server_mysql.sql | 1 +
.../app/controller/JobTaskControllerWrapper.java | 16 +++-
.../controller/TaskInstanceControllerWrapper.java | 100 +++++++++++++++++++++
.../app/test/JobExecutorControllerTest.java | 23 +++++
.../app/test/TaskInstanceControllerTest.java | 73 +++++++++++++++
.../org/apache/seatunnel/app/utils/JobUtils.java | 19 +++-
seatunnel-web-it/src/test/resources/hazelcast.yaml | 10 +--
15 files changed, 277 insertions(+), 47 deletions(-)
diff --git a/README.md b/README.md
index 0eae9fc6..8f516523 100644
--- a/README.md
+++ b/README.md
@@ -206,3 +206,9 @@ Now ,let me show you how to use it.
#### Virtual Tables manage

+
+### Upgrades
+#### 1. Upgrade from 1.0.1 or before to 1.0.2 or after.
+Execute the following SQL to upgrade the database:
+
+```ALTER TABLE `t_st_job_instance` ADD COLUMN `error_message` varchar(4096)
CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL;```
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
index edcb2f7e..db5cbeba 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
@@ -73,4 +73,7 @@ public class JobInstance {
@TableField("job_type")
private String jobType;
+
+ @TableField("error_message")
+ private String errorMessage;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
index e6db1d84..e971452e 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.app.dal.entity.JobLine;
import org.apache.seatunnel.app.dal.entity.JobTask;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
+import org.apache.seatunnel.engine.core.job.JobResult;
import lombok.NonNull;
@@ -40,5 +41,8 @@ public interface IJobInstanceService {
JobExecutorRes getExecuteResource(@NonNull Long jobEngineId);
void complete(
- @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull
String jobEngineId);
+ @NonNull Integer userId,
+ @NonNull Long jobInstanceId,
+ @NonNull String jobEngineId,
+ JobResult jobResult);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index b8ca731e..e7490347 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.app.service.IJobInstanceService;
import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
import
org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
+import org.apache.seatunnel.app.utils.JobExecParamUtil;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
@@ -37,6 +38,7 @@ import
org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
@@ -128,6 +130,9 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
JobInstance jobInstance =
jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobStatus(JobStatus.FAILED.name());
jobInstance.setEndTime(new Date());
+ String jobInstanceErrorMessage =
+
JobExecParamUtil.getJobInstanceErrorMessage(e.getMessage());
+ jobInstance.setErrorMessage(jobInstanceErrorMessage);
jobInstanceDao.update(jobInstance);
throw new RuntimeException(e.getMessage(), e);
}
@@ -152,21 +157,22 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
String jobEngineId,
SeaTunnelClient seaTunnelClient) {
ExecutorService executor = Executors.newFixedThreadPool(1);
- CompletableFuture<JobStatus> future =
-
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete, executor);
+ CompletableFuture<JobResult> future =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobCompleteV2, executor);
+ JobResult jobResult = new JobResult(JobStatus.FAILED, "");
try {
- log.info("future.get before");
- JobStatus jobStatus = future.get();
-
+ jobResult = future.get();
executor.shutdown();
} catch (InterruptedException e) {
+ jobResult.setError(e.getMessage());
throw new RuntimeException(e);
} catch (ExecutionException e) {
+ jobResult.setError(e.getMessage());
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
log.info("and jobInstanceService.complete begin");
- jobInstanceService.complete(userId, jobInstanceId, jobEngineId);
+ jobInstanceService.complete(userId, jobInstanceId, jobEngineId,
jobResult);
}
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index 7a05da7e..68f7ee00 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -52,7 +52,6 @@ import
org.apache.seatunnel.app.domain.request.job.transform.Transform;
import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions;
import
org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
-import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
import
org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.service.IDatasourceService;
import org.apache.seatunnel.app.service.IJobInstanceService;
@@ -64,7 +63,7 @@ import org.apache.seatunnel.app.utils.JobExecParamUtil;
import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
@@ -361,34 +360,18 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public void complete(
- @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull
String jobEngineId) {
+ @NonNull Integer userId,
+ @NonNull Long jobInstanceId,
+ @NonNull String jobEngineId,
+ JobResult jobResult) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE,
userId);
JobInstance jobInstance =
jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);
-
- List<JobPipelineSummaryMetricsRes> status =
- jobMetricsService.getJobPipelineSummaryMetrics(userId,
jobInstanceId);
-
- String jobStatus;
- Set<String> statusList =
- status.stream()
- .map(JobPipelineSummaryMetricsRes::getStatus)
- .map(String::toUpperCase)
- .collect(Collectors.toSet());
- if (statusList.size() == 1 && statusList.contains("FINISHED")) {
- jobStatus = JobStatus.FINISHED.name();
- } else if (statusList.contains("FAILED")) {
- jobStatus = JobStatus.FAILED.name();
- } else if (statusList.contains("CANCELED")) {
- jobStatus = JobStatus.CANCELED.name();
- } else if (statusList.contains("CANCELLING")) {
- jobStatus = JobStatus.CANCELING.name();
- } else {
- jobStatus = JobStatus.RUNNING.name();
- }
- jobInstance.setJobStatus(jobStatus);
+ jobInstance.setJobStatus(jobResult.getStatus().name());
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
+ jobInstance.setErrorMessage(
+
JobExecParamUtil.getJobInstanceErrorMessage(jobResult.getError()));
jobInstanceDao.update(jobInstance);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
index 5e236849..497524df 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
@@ -27,6 +27,18 @@ import java.util.Map;
public class JobExecParamUtil {
+ // The maximum length of the job execution error message, 4KB
+ private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;
+
+ public static String getJobInstanceErrorMessage(String message) {
+ if (message == null) {
+ return null;
+ }
+ return message.length() > ERROR_MESSAGE_MAX_LENGTH
+ ? message.substring(0, ERROR_MESSAGE_MAX_LENGTH)
+ : message;
+ }
+
public static Config updateEnvConfig(JobExecParam jobExecParam, Config
envConfig) {
if (jobExecParam == null || jobExecParam.getEnv() == null) {
return envConfig;
diff --git
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
index 60c2ae11..ee61b70c 100644
---
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
+++
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
@@ -24,10 +24,11 @@
<result column="engine_name" jdbcType="VARCHAR" property="engineName"/>
<result column="engine_version" jdbcType="VARCHAR"
property="engineVersion"/>
<result column="job_engine_id" jdbcType="VARCHAR"
property="jobEngineId"/>
+ <result column="error_message" jdbcType="VARCHAR"
property="errorMessage"/>
</resultMap>
<sql id="Base_Column_List">
id
- , `job_define_id`, `job_status`, `job_config`, `engine_name`,
`engine_version`, `job_engine_id`
+ , `job_define_id`, `job_status`, `job_config`, `engine_name`,
`engine_version`, `job_engine_id`,`error_message`
</sql>
<select id="queryJobInstanceListPaging"
resultType="org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto">
diff --git
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_h2.sql
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_h2.sql
index d74a0bdc..c85527f2 100644
---
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_h2.sql
+++
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_h2.sql
@@ -99,6 +99,7 @@ CREATE TABLE t_st_job_instance (
update_time TIMESTAMP(3) NOT NULL DEFAULT
CURRENT_TIMESTAMP(3),
end_time TIMESTAMP(3) DEFAULT NULL,
job_type VARCHAR(50) NOT NULL,
+ error_message VARCHAR(4096) DEFAULT NULL,
PRIMARY KEY (id)
);
diff --git
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
index 4f6663d6..2fd44da2 100644
---
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
+++
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
@@ -107,6 +107,7 @@ CREATE TABLE `t_st_job_instance` (
`update_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE
CURRENT_TIMESTAMP(3),
`end_time` timestamp(3) NULL DEFAULT NULL,
`job_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
+ `error_message` varchar(4096) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL
DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT =
Dynamic;
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
index a89581ca..28cb6ba9 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
@@ -71,7 +71,7 @@ public class JobTaskControllerWrapper extends
SeatunnelWebTestingBase {
return JSONTestUtils.parseObject(response, Result.class);
}
- public String createFakeSourcePlugin(String datasourceId, long
jobVersionId) {
+ public String createFakeSourcePlugin(String datasourceId, long
jobVersionId, String rows) {
DataSourceOption tableOption = new DataSourceOption();
tableOption.setDatabases(Arrays.asList("fake_database"));
tableOption.setTables(Arrays.asList("fake_table"));
@@ -88,7 +88,9 @@ public class JobTaskControllerWrapper extends
SeatunnelWebTestingBase {
.dataSourceId(Long.parseLong(datasourceId))
.sceneMode(SceneMode.SINGLE_TABLE)
.config(
-
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name =
\\\"string\\\"\\n age = \\\"int\\\"\\n
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.templat
[...]
+
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name =
\\\"string\\\"\\n age = \\\"int\\\"\\n
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.templat
[...]
+ + rows
+ +
"\",\"row.num\":5,\"split.num\":1,\"split.read-interval\":1,\"map.size\":5,\"array.size\":5,\"bytes.length\":5,\"date.year.template\":\"\",\"date.month.template\":\"\",\"date.day.template\":\"\",\"time.hour.template\":\"\",\"time.minute.template\":\"\",\"time.second.template\":\"\",\"parallelism\":1}")
.build();
Result<Void> srcResult = saveSingleTask(jobVersionId,
sourcePluginConfig);
@@ -96,6 +98,16 @@ public class JobTaskControllerWrapper extends
SeatunnelWebTestingBase {
return sourcePluginId;
}
+ public String createFakeSourcePlugin(String datasourceId, long
jobVersionId) {
+ return createFakeSourcePlugin(datasourceId, jobVersionId, "");
+ }
+
+ public String createFakeSourcePluginThatFails(String datasourceId, long
jobVersionId) {
+ String rows =
+ "[{kind=INSERT, fields=[\"org\", 100]}, {kind=INSERT,
fields=[\"apache\", 50]}, {kind=INSERT, fields=[\"seatunnel\", 25]},
{kind=INSERT, fields=[\"seatunnel-web\", 12]}, {kind=INSERT, fields=[\"etl\",
6_age_invalid_number]}]";
+ return createFakeSourcePlugin(datasourceId, jobVersionId, rows);
+ }
+
public String createConsoleSinkPlugin(String datasourceId, long
jobVersionId) {
DataSourceOption sinkTableOption = new DataSourceOption();
sinkTableOption.setDatabases(Arrays.asList("console_fake_database"));
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
new file mode 100644
index 00000000..41cece13
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.PageInfo;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TaskInstanceControllerWrapper extends SeatunnelWebTestingBase {
+
+ private static SimpleDateFormat dateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(
+ String jobDefineName,
+ String executorName,
+ String stateType,
+ String startTime,
+ String endTime,
+ String syncTaskType,
+ Integer pageNo,
+ Integer pageSize) {
+ String response =
+ sendRequest(
+ urlWithParam(
+ "task/jobMetrics?jobDefineName="
+ + jobDefineName
+ + "&executorName="
+ + executorName
+ + "&stateType="
+ + stateType
+ + "&startDate="
+ + startTime
+ + "&endDate="
+ + endTime
+ + "&syncTaskType="
+ + syncTaskType
+ + "&pageNo="
+ + pageNo
+ + "&pageSize="
+ + pageSize));
+ return JSONTestUtils.parseObject(
+ response, new
TypeReference<Result<PageInfo<SeaTunnelJobInstanceDto>>>() {});
+ }
+
+ public SeaTunnelJobInstanceDto getTaskInstanceList(String jobDefineName) {
+ String startTime =
+ URLEncoder.encode(
+ dateFormat.format(
+ new Date(System.currentTimeMillis() - 1000 *
60 * 60 * 24)));
+ String endTime =
+ URLEncoder.encode(
+ dateFormat.format(
+ new Date(System.currentTimeMillis() + 1000 *
60 * 60 * 24)));
+ String syncTaskType = "BATCH";
+ Integer pageNo = 1;
+ Integer pageSize = 10;
+ Result<PageInfo<SeaTunnelJobInstanceDto>> result =
+ getTaskInstanceList(
+ jobDefineName,
+ null,
+ null,
+ startTime,
+ endTime,
+ syncTaskType,
+ pageNo,
+ pageSize);
+ assertTrue(result.isSuccess());
+ if (result.getData().getTotalList().isEmpty()) {
+ return null;
+ }
+ assertEquals(1, result.getData().getTotalList().size());
+ return result.getData().getTotalList().get(0);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 3af81eb0..8d20497b 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
@@ -51,6 +53,7 @@ public class JobExecutorControllerTest {
private static final String uniqueId = "_" + System.currentTimeMillis();
private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper;
private static JobControllerWrapper jobControllerWrapper;
+ private static TaskInstanceControllerWrapper taskInstanceControllerWrapper;
@BeforeAll
public static void setUp() {
@@ -58,6 +61,7 @@ public class JobExecutorControllerTest {
jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
seatunnelDatasourceControllerWrapper = new
SeatunnelDatasourceControllerWrapper();
jobControllerWrapper = new JobControllerWrapper();
+ taskInstanceControllerWrapper = new TaskInstanceControllerWrapper();
}
@Test
@@ -274,6 +278,25 @@ public class JobExecutorControllerTest {
assertFalse(result.isSuccess());
// Even though job failed but job instance is created into the
database.
assertTrue(result.getData() > 0);
+ SeaTunnelJobInstanceDto taskInstanceList =
+ taskInstanceControllerWrapper.getTaskInstanceList(jobName);
+ assertNotNull(taskInstanceList.getErrorMessage());
+ }
+
+ @Test
+ public void storeErrorMessageWhenJobFailed() throws InterruptedException {
+ String jobName = "failureCause" + uniqueId;
+ long jobVersionId = JobUtils.createJob(jobName, true);
+ Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ // job submitted successfully but it will fail during execution
+ assertTrue(result.isSuccess());
+ assertTrue(result.getData() > 0);
+ JobUtils.waitForJobCompletion(result.getData());
+ // extra second to let the data get updated in the database
+ Thread.sleep(2000);
+ SeaTunnelJobInstanceDto taskInstanceList =
+ taskInstanceControllerWrapper.getTaskInstanceList(jobName);
+ assertNotNull(taskInstanceList.getErrorMessage());
}
@AfterAll
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
new file mode 100644
index 00000000..08d4aacd
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
+import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.utils.JobUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TaskInstanceControllerTest extends SeatunnelWebTestingBase {
+
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
+ private static TaskInstanceControllerWrapper taskInstanceControllerWrapper;
+ private static final String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
+ taskInstanceControllerWrapper = new TaskInstanceControllerWrapper();
+ }
+
+ @Test
+ public void getTaskInstanceList_shouldReturnData_whenValidRequest() {
+ String jobName = "getTaskInstance" + uniqueId;
+ long jobVersionId = JobUtils.createJob(jobName);
+ Result<Long> execuitonResult =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ assertTrue(execuitonResult.isSuccess());
+ Result<List<JobPipelineDetailMetricsRes>> listResult =
+ JobUtils.waitForJobCompletion(execuitonResult.getData());
+ assertEquals(1, listResult.getData().size());
+ assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+
+ SeaTunnelJobInstanceDto taskInstanceList =
+ taskInstanceControllerWrapper.getTaskInstanceList(jobName);
+ assertNotNull(taskInstanceList);
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
index a9214baf..2cce6317 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -109,6 +109,10 @@ public class JobUtils {
}
public static Long createJob(String jobName) {
+ return createJob(jobName, false);
+ }
+
+ public static Long createJob(String jobName, boolean shouldExecutionFail) {
Long jobId =
jobDefinitionControllerWrapper.createJobDefinition(jobName);
JobConfig jobConfig =
jobConfigControllerWrapper.populateJobConfigObject(jobName);
// jobVersionId is same as jobId
@@ -124,9 +128,18 @@ public class JobUtils {
String consoleDatasourceId =
seatunnelDatasourceControllerWrapper.createConsoleDatasource("console_" +
jobName);
- String sourcePluginId =
- jobTaskControllerWrapper.createFakeSourcePlugin(
- fakeSourceDatasourceId, jobVersionId);
+ String sourcePluginId;
+ if (shouldExecutionFail) {
+ sourcePluginId =
+ jobTaskControllerWrapper.createFakeSourcePluginThatFails(
+ fakeSourceDatasourceId, jobVersionId);
+
+ } else {
+ sourcePluginId =
+ jobTaskControllerWrapper.createFakeSourcePlugin(
+ fakeSourceDatasourceId, jobVersionId);
+ }
+
String transPluginId =
jobTaskControllerWrapper.createReplaceTransformPlugin(jobVersionId);
String sinkPluginId =
jobTaskControllerWrapper.createConsoleSinkPlugin(consoleDatasourceId,
jobVersionId);
diff --git a/seatunnel-web-it/src/test/resources/hazelcast.yaml
b/seatunnel-web-it/src/test/resources/hazelcast.yaml
index 7ac730e0..48ed2641 100644
--- a/seatunnel-web-it/src/test/resources/hazelcast.yaml
+++ b/seatunnel-web-it/src/test/resources/hazelcast.yaml
@@ -35,13 +35,5 @@ hazelcast:
port: 5901
properties:
hazelcast.invocation.max.retry.count: 20
- hazelcast.tcp.join.port.try.count: 30
- hazelcast.logging.type: log4j2
- hazelcast.operation.generic.thread.count: 50
- hazelcast.heartbeat.failuredetector.type: phi-accrual
- hazelcast.heartbeat.interval.seconds: 2
- hazelcast.max.no.heartbeat.seconds: 180
- hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
- hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
- hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
+