This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 76d059810a Forbidden forcess success a task instance in a running
workflow instance (#15855)
76d059810a is described below
commit 76d059810a928c3a122df3ee917cb5eedaf60063
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Apr 16 16:19:57 2024 +0800
Forbidden forcess success a task instance in a running workflow instance
(#15855)
---
.../api/controller/TaskInstanceController.java | 9 +-
.../controller/v2/TaskInstanceV2Controller.java | 4 +-
.../api/service/TaskInstanceService.java | 6 +-
.../api/service/impl/TaskInstanceServiceImpl.java | 62 +++++------
.../dolphinscheduler/api/AssertionsHelper.java | 8 ++
.../api/controller/TaskInstanceControllerTest.java | 4 +-
.../v2/TaskInstanceV2ControllerTest.java | 8 +-
.../api/service/TaskInstanceServiceTest.java | 119 +++++++++++----------
.../service/process/ProcessService.java | 2 +-
.../service/process/ProcessServiceImpl.java | 13 +--
10 files changed, 115 insertions(+), 120 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
index 1865dfee5d..e0055595c9 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
@@ -153,10 +153,11 @@ public class TaskInstanceController extends
BaseController {
@ResponseStatus(HttpStatus.OK)
@ApiException(FORCE_TASK_SUCCESS_ERROR)
@OperatorLog(auditType = AuditType.TASK_INSTANCE_FORCE_SUCCESS)
- public Result forceTaskSuccess(@Parameter(hidden = true)
@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @Schema(name = "projectCode", required =
true) @PathVariable long projectCode,
- @PathVariable(value = "id") Integer id) {
- return taskInstanceService.forceTaskSuccess(loginUser, projectCode,
id);
+ public Result<Void> forceTaskSuccess(@Parameter(hidden = true)
@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @Schema(name = "projectCode",
required = true) @PathVariable long projectCode,
+ @PathVariable(value = "id") Integer
id) {
+ taskInstanceService.forceTaskSuccess(loginUser, projectCode, id);
+ return Result.success();
}
/**
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java
index 3e3a87681b..ea767f0fa3 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java
@@ -167,8 +167,8 @@ public class TaskInstanceV2Controller extends
BaseController {
public TaskInstanceSuccessResponse forceTaskSuccess(@Parameter(hidden =
true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name =
"projectCode", description = "PROJECT_CODE", required = true) @PathVariable
long projectCode,
@PathVariable(value =
"id") Integer id) {
- Result result = taskInstanceService.forceTaskSuccess(loginUser,
projectCode, id);
- return new TaskInstanceSuccessResponse(result);
+ taskInstanceService.forceTaskSuccess(loginUser, projectCode, id);
+ return new TaskInstanceSuccessResponse(Result.success());
}
/**
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
index 86e5396dbe..bff0518685 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
@@ -72,9 +72,9 @@ public interface TaskInstanceService {
* @param taskInstanceId task instance id
* @return the result code and msg
*/
- Result forceTaskSuccess(User loginUser,
- long projectCode,
- Integer taskInstanceId);
+ void forceTaskSuccess(User loginUser,
+ long projectCode,
+ Integer taskInstanceId);
/**
* task savepoint
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index f06f8115a9..7469d8db13 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -23,6 +23,7 @@ import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import
org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
@@ -33,14 +34,15 @@ import
org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
@@ -107,6 +109,9 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
@Autowired
private TaskGroupQueueService taskGroupQueueService;
+ @Autowired
+ private ProcessInstanceDao workflowInstanceDao;
+
/**
* query task list by project, process instance, task name, task start
time, task end time, task status, keyword paging
*
@@ -216,58 +221,39 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
*/
@Transactional
@Override
- public Result forceTaskSuccess(User loginUser, long projectCode, Integer
taskInstanceId) {
- Result result = new Result();
- Project project = projectMapper.queryByCode(projectCode);
+ public void forceTaskSuccess(User loginUser, long projectCode, Integer
taskInstanceId) {
// check user access for project
- Map<String, Object> checkResult =
- projectService.checkProjectAndAuth(loginUser, project,
projectCode, FORCED_SUCCESS);
- Status status = (Status) checkResult.get(Constants.STATUS);
- if (status != Status.SUCCESS) {
- putMsg(result, status);
- return result;
- }
+ projectService.checkProjectAndAuthThrowException(loginUser,
projectCode, FORCED_SUCCESS);
- // check whether the task instance can be found
- TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
- if (task == null) {
- log.error("Task instance can not be found, projectCode:{},
taskInstanceId:{}.", projectCode,
- taskInstanceId);
- putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
- return result;
+ TaskInstance task = taskInstanceDao.queryOptionalById(taskInstanceId)
+ .orElseThrow(() -> new
ServiceException(Status.TASK_INSTANCE_NOT_FOUND));
+
+ if (task.getProjectCode() != projectCode) {
+ throw new ServiceException("The task instance is not under the
project: " + projectCode);
}
- TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(task.getTaskCode());
- if (taskDefinition != null && projectCode !=
taskDefinition.getProjectCode()) {
- log.error("Task definition can not be found, projectCode:{},
taskDefinitionCode:{}.", projectCode,
- task.getTaskCode());
- putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstanceId);
- return result;
+ ProcessInstance processInstance =
workflowInstanceDao.queryOptionalById(task.getProcessInstanceId())
+ .orElseThrow(
+ () -> new
ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST,
task.getProcessInstanceId()));
+ if (!processInstance.getState().isFinished()) {
+ throw new ServiceException("The workflow instance is not finished:
" + processInstance.getState()
+ + " cannot force start task instance");
}
// check whether the task instance state type is failure or cancel
if (!task.getState().isFailure() && !task.getState().isKill()) {
- log.warn("{} type task instance can not perform force success,
projectCode:{}, taskInstanceId:{}.",
- task.getState().getDesc(), projectCode, taskInstanceId);
- putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR,
taskInstanceId, task.getState().toString());
- return result;
+ throw new
ServiceException(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId,
task.getState());
}
// change the state of the task instance
task.setState(TaskExecutionStatus.FORCED_SUCCESS);
task.setEndTime(new Date());
int changedNum = taskInstanceMapper.updateById(task);
- if (changedNum > 0) {
-
processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId);
- log.info("Task instance performs force success complete,
projectCode:{}, taskInstanceId:{}", projectCode,
- taskInstanceId);
- putMsg(result, Status.SUCCESS);
- } else {
- log.error("Task instance performs force success complete,
projectCode:{}, taskInstanceId:{}",
- projectCode, taskInstanceId);
- putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
+ if (changedNum <= 0) {
+ throw new ServiceException(Status.FORCE_TASK_SUCCESS_ERROR);
}
- return result;
+ processService.forceProcessInstanceSuccessByTaskInstanceId(task);
+ log.info("Force success task instance:{} success", taskInstanceId);
}
@Override
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java
index d2da5bc638..eae064bb24 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java
@@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.api;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import java.text.MessageFormat;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.function.Executable;
@@ -30,6 +32,12 @@ public class AssertionsHelper extends Assertions {
Assertions.assertEquals(status.getCode(), exception.getCode());
}
+ public static void assertThrowsServiceException(String message, Executable
executable) {
+ ServiceException exception =
Assertions.assertThrows(ServiceException.class, executable);
+
Assertions.assertEquals(MessageFormat.format(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(),
message),
+ exception.getMessage());
+ }
+
public static void assertDoesNotThrow(Executable executable) {
Assertions.assertDoesNotThrow(executable);
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
index 7ebe5bf757..b58944537b 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
@@ -82,9 +82,7 @@ public class TaskInstanceControllerTest extends
AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("taskInstanceId", "104");
- Result mockResult = new Result();
- putMsg(mockResult, Status.SUCCESS);
- when(taskInstanceService.forceTaskSuccess(any(User.class), anyLong(),
anyInt())).thenReturn(mockResult);
+
Mockito.doNothing().when(taskInstanceService).forceTaskSuccess(any(User.class),
anyLong(), anyInt());
MvcResult mvcResult =
mockMvc.perform(post("/projects/{projectName}/task-instance/force-success",
"cxc_1113")
.header(SESSION_ID, sessionId)
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java
index cc70f200e1..8e76ec1694 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
import
org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceQueryRequest;
+import
org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceSuccessResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -85,12 +86,9 @@ public class TaskInstanceV2ControllerTest extends
AbstractControllerTest {
@Test
public void testForceTaskSuccess() {
- Result mockResult = new Result();
- putMsg(mockResult, Status.SUCCESS);
-
- when(taskInstanceService.forceTaskSuccess(any(), Mockito.anyLong(),
Mockito.anyInt())).thenReturn(mockResult);
+ Mockito.doNothing().when(taskInstanceService).forceTaskSuccess(any(),
Mockito.anyLong(), Mockito.anyInt());
- Result taskResult = taskInstanceV2Controller.forceTaskSuccess(null,
1L, 1);
+ TaskInstanceSuccessResponse taskResult =
taskInstanceV2Controller.forceTaskSuccess(null, 1L, 1);
Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()),
taskResult.getCode());
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
index aca0d80a6f..dd7acb16d5 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
+import static
org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException;
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
import static org.mockito.ArgumentMatchers.any;
@@ -25,7 +26,6 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
-import org.apache.dolphinscheduler.api.ApiApplicationServer;
import
org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
@@ -35,15 +35,16 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -65,7 +66,6 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
-import org.springframework.boot.test.context.SpringBootTest;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -74,7 +74,6 @@ import
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
-@SpringBootTest(classes = ApiApplicationServer.class)
public class TaskInstanceServiceTest {
@InjectMocks
@@ -100,6 +99,8 @@ public class TaskInstanceServiceTest {
@Mock
TaskInstanceDao taskInstanceDao;
+ @Mock
+ ProcessInstanceDao workflowInstanceDao;
@Test
public void queryTaskListPaging() {
@@ -324,6 +325,7 @@ public class TaskInstanceServiceTest {
private TaskInstance getTaskInstance() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
+ taskInstance.setProjectCode(1L);
taskInstance.setName("test_task_instance");
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
@@ -343,64 +345,69 @@ public class TaskInstanceServiceTest {
}
@Test
- public void testForceTaskSuccess() {
+ public void testForceTaskSuccess_withNoPermission() {
+ User user = getAdminUser();
+ TaskInstance task = getTaskInstance();
+ doThrow(new
ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
+ .checkProjectAndAuthThrowException(user,
task.getProjectCode(), FORCED_SUCCESS);
+ assertThrowsServiceException(Status.USER_NO_OPERATION_PROJECT_PERM,
+ () -> taskInstanceService.forceTaskSuccess(user,
task.getProjectCode(), task.getId()));
+ }
+
+ @Test
+ public void testForceTaskSuccess_withTaskInstanceNotFound() {
+ User user = getAdminUser();
+ TaskInstance task = getTaskInstance();
+
doNothing().when(projectService).checkProjectAndAuthThrowException(user,
task.getProjectCode(), FORCED_SUCCESS);
+
when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.empty());
+ assertThrowsServiceException(Status.TASK_INSTANCE_NOT_FOUND,
+ () -> taskInstanceService.forceTaskSuccess(user,
task.getProjectCode(), task.getId()));
+ }
+
+ @Test
+ public void testForceTaskSuccess_withWorkflowInstanceNotFound() {
+ User user = getAdminUser();
+ TaskInstance task = getTaskInstance();
+
doNothing().when(projectService).checkProjectAndAuthThrowException(user,
task.getProjectCode(), FORCED_SUCCESS);
+
when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task));
+
when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId())).thenReturn(Optional.empty());
+
+ assertThrowsServiceException(Status.PROCESS_INSTANCE_NOT_EXIST,
+ () -> taskInstanceService.forceTaskSuccess(user,
task.getProjectCode(), task.getId()));
+ }
+
+ @Test
+ public void testForceTaskSuccess_withWorkflowInstanceNotFinished() {
User user = getAdminUser();
long projectCode = 1L;
- Project project = getProject(projectCode);
- int taskId = 1;
TaskInstance task = getTaskInstance();
+ ProcessInstance processInstance = getProcessInstance();
+ processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
+
doNothing().when(projectService).checkProjectAndAuthThrowException(user,
projectCode, FORCED_SUCCESS);
+
when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task));
+
when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId()))
+ .thenReturn(Optional.of(processInstance));
- Map<String, Object> mockSuccess = new HashMap<>(5);
- putMsg(mockSuccess, Status.SUCCESS);
- when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+ assertThrowsServiceException(
+ "The workflow instance is not finished: " +
processInstance.getState()
+ + " cannot force start task instance",
+ () -> taskInstanceService.forceTaskSuccess(user, projectCode,
task.getId()));
+ }
- // user auth failed
- Map<String, Object> mockFailure = new HashMap<>(5);
- putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM,
user.getUserName(), projectCode);
- when(projectService.checkProjectAndAuth(user, project, projectCode,
FORCED_SUCCESS)).thenReturn(mockFailure);
- Result authFailRes = taskInstanceService.forceTaskSuccess(user,
projectCode, taskId);
- Assertions.assertNotSame(Status.SUCCESS.getCode(),
authFailRes.getCode());
-
- // test task not found
- when(projectService.checkProjectAndAuth(user, project, projectCode,
FORCED_SUCCESS)).thenReturn(mockSuccess);
- when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null);
- TaskDefinition taskDefinition = new TaskDefinition();
- taskDefinition.setProjectCode(projectCode);
-
when(taskDefinitionMapper.queryByCode(task.getTaskCode())).thenReturn(taskDefinition);
- Result taskNotFoundRes = taskInstanceService.forceTaskSuccess(user,
projectCode, taskId);
- Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(),
taskNotFoundRes.getCode().intValue());
-
- // test task instance state error
- task.setState(TaskExecutionStatus.SUCCESS);
- when(taskInstanceMapper.selectById(1)).thenReturn(task);
- Map<String, Object> result = new HashMap<>();
- putMsg(result, Status.SUCCESS, projectCode);
- when(projectMapper.queryByCode(projectCode)).thenReturn(project);
- when(projectService.checkProjectAndAuth(user, project, projectCode,
FORCED_SUCCESS)).thenReturn(result);
- Result taskStateErrorRes = taskInstanceService.forceTaskSuccess(user,
projectCode, taskId);
-
Assertions.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR.getCode(),
- taskStateErrorRes.getCode().intValue());
-
- // test error
- task.setState(TaskExecutionStatus.FAILURE);
- when(taskInstanceMapper.updateById(task)).thenReturn(0);
- putMsg(result, Status.SUCCESS, projectCode);
- when(projectMapper.queryByCode(projectCode)).thenReturn(project);
- when(projectService.checkProjectAndAuth(user, project, projectCode,
FORCED_SUCCESS)).thenReturn(result);
- Result errorRes = taskInstanceService.forceTaskSuccess(user,
projectCode, taskId);
- Assertions.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR.getCode(),
errorRes.getCode().intValue());
-
- // test success
- task.setState(TaskExecutionStatus.FAILURE);
- task.setEndTime(null);
- when(taskInstanceMapper.updateById(task)).thenReturn(1);
- putMsg(result, Status.SUCCESS, projectCode);
- when(projectMapper.queryByCode(projectCode)).thenReturn(project);
- when(projectService.checkProjectAndAuth(user, project, projectCode,
FORCED_SUCCESS)).thenReturn(result);
- Result successRes = taskInstanceService.forceTaskSuccess(user,
projectCode, taskId);
- Assertions.assertEquals(Status.SUCCESS.getCode(),
successRes.getCode().intValue());
- Assertions.assertNotNull(task.getEndTime());
+ @Test
+ public void testForceTaskSuccess_withTaskInstanceNotFinished() {
+ User user = getAdminUser();
+ TaskInstance task = getTaskInstance();
+ ProcessInstance processInstance = getProcessInstance();
+ processInstance.setState(WorkflowExecutionStatus.FAILURE);
+
doNothing().when(projectService).checkProjectAndAuthThrowException(user,
task.getProjectCode(), FORCED_SUCCESS);
+
when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task));
+
when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId()))
+ .thenReturn(Optional.of(processInstance));
+ assertThrowsServiceException(
+ Status.TASK_INSTANCE_STATE_OPERATION_ERROR,
+ () -> taskInstanceService.forceTaskSuccess(user,
task.getProjectCode(), task.getId()));
}
@Test
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 101350e90d..8787aabc8b 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -189,7 +189,7 @@ public interface ProcessService {
public String findConfigYamlByName(String clusterName);
- void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
+ void forceProcessInstanceSuccessByTaskInstanceId(TaskInstance
taskInstance);
void saveCommandTrigger(Integer commandId, Integer processInstanceId);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 028ab7651f..3c207ae982 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -276,6 +276,7 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private TriggerRelationService triggerRelationService;
+
/**
* todo: split this method
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
@@ -621,13 +622,13 @@ public class ProcessServiceImpl implements ProcessService
{
/**
* Get workflow runtime tenant
- *
+ * <p>
* the workflow provides a tenant and uses the provided tenant;
* when no tenant is provided or the provided tenant is the default
tenant, \
* the user's tenant created by the workflow is used
*
* @param tenantCode tenantCode
- * @param userId userId
+ * @param userId userId
* @return tenant code
*/
@Override
@@ -2114,11 +2115,7 @@ public class ProcessServiceImpl implements
ProcessService {
}
@Override
- public void forceProcessInstanceSuccessByTaskInstanceId(Integer
taskInstanceId) {
- TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
- if (task == null) {
- return;
- }
+ public void forceProcessInstanceSuccessByTaskInstanceId(TaskInstance task)
{
ProcessInstance processInstance =
findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
if (processInstance != null
&& (processInstance.getState().isFailure() ||
processInstance.getState().isStop())) {
@@ -2139,7 +2136,7 @@ public class ProcessServiceImpl implements ProcessService
{
List<Integer> failTaskList = validTaskList.stream()
.filter(instance -> instance.getState().isFailure() ||
instance.getState().isKill())
.map(TaskInstance::getId).collect(Collectors.toList());
- if (failTaskList.size() == 1 &&
failTaskList.contains(taskInstanceId)) {
+ if (failTaskList.size() == 1 &&
failTaskList.contains(task.getId())) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUCCESS, "success by
task force success");
processInstanceDao.updateById(processInstance);
}