This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.1-prepare by this push:
new d7eab83 [cherry-pick-7221/7232] pick some api bug (#7234)
d7eab83 is described below
commit d7eab830a74f5db35a50d6ae1ffd23cb4320fe9f
Author: JinYong Li <[email protected]>
AuthorDate: Tue Dec 7 16:12:59 2021 +0800
[cherry-pick-7221/7232] pick some api bug (#7234)
* fix worker group display (#7208)
* [Bug] [API] queryProcessDefinitionByCode bug (#7221)
* pick-7232/7221
---
.../api/controller/SchedulerController.java | 6 +++--
.../service/impl/TaskDefinitionServiceImpl.java | 31 +++++++++++++++++++---
.../service/impl/WorkFlowLineageServiceImpl.java | 3 +++
.../api/service/TaskDefinitionServiceImplTest.java | 16 ++++++++---
.../service/process/ProcessService.java | 12 ++++++---
5 files changed, 56 insertions(+), 12 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
index e1edfea..0e1ca4d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
@@ -148,6 +148,7 @@ public class SchedulerController extends BaseController {
@ApiImplicitParam(name = "environmentCode", value =
"ENVIRONMENT_CODE", dataType = "Long"),
})
@PutMapping("/{id}")
+ @ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_SCHEDULE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateSchedule(@ApiIgnore @RequestAttribute(value =
SESSION_USER) User loginUser,
@@ -330,11 +331,12 @@ public class SchedulerController extends BaseController {
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", type =
"WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID",
dataType = "Int", example = "100"),
@ApiImplicitParam(name = "failureStrategy", value =
"FAILURE_STRATEGY", type = "FailureStrategy"),
- @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID",
dataType = "Int", example = "100"),
+ @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP",
dataType = "String", example = "default"),
@ApiImplicitParam(name = "processInstancePriority", value =
"PROCESS_INSTANCE_PRIORITY", type = "Priority"),
@ApiImplicitParam(name = "environmentCode", value =
"ENVIRONMENT_CODE", dataType = "Long"),
})
- @PutMapping("/{code}")
+ @PutMapping("/update/{code}")
+ @ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_SCHEDULE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateScheduleByProcessDefinitionCode(@ApiIgnore
@RequestAttribute(value = SESSION_USER) User loginUser,
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index c7c5953..c5f40c5 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -206,6 +206,19 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
int delete = taskDefinitionMapper.deleteByCode(taskCode);
if (delete > 0) {
+ List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
+ if (!processTaskRelationList.isEmpty()) {
+ int deleteRelation = 0;
+ int deleteRelationLog = 0;
+ for (ProcessTaskRelation processTaskRelation :
taskRelationList) {
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
+ deleteRelation +=
processTaskRelationMapper.deleteRelation(processTaskRelationLog);
+ deleteRelationLog +=
processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+ }
+ if ((deleteRelation & deleteRelationLog) == 0) {
+ throw new
ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+ }
+ }
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
@@ -492,6 +505,7 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
* @param releaseState releaseState
* @return update result code
*/
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> releaseTaskDefinition(User loginUser, long
projectCode, long code, ReleaseState releaseState) {
Project project = projectMapper.queryByCode(projectCode);
@@ -510,11 +524,15 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
return result;
}
-
+ TaskDefinitionLog taskDefinitionLog =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code,
taskDefinition.getVersion());
+ if (taskDefinitionLog == null) {
+ putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
+ return result;
+ }
switch (releaseState) {
case OFFLINE:
taskDefinition.setFlag(Flag.NO);
- taskDefinitionMapper.updateById(taskDefinition);
+ taskDefinitionLog.setFlag(Flag.NO);
break;
case ONLINE:
String resourceIds = taskDefinition.getResourceIds();
@@ -530,13 +548,18 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
}
taskDefinition.setFlag(Flag.YES);
- taskDefinitionMapper.updateById(taskDefinition);
+ taskDefinitionLog.setFlag(Flag.NO);
break;
default:
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
RELEASESTATE);
return result;
}
-
+ int update = taskDefinitionMapper.updateById(taskDefinition);
+ int updateLog = taskDefinitionLogMapper.updateById(taskDefinitionLog);
+ if ((update == 0 && updateLog == 1) || (update == 1 && updateLog ==
0)) {
+ putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
+ throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
+ }
putMsg(result, Status.SUCCESS);
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index d665a1a..1367de7 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -177,6 +177,9 @@ public class WorkFlowLineageServiceImpl extends
BaseServiceImpl implements WorkF
private Set<Long> querySourceWorkFlowCodes(long projectCode, long
workFlowCode, List<TaskDefinition> taskDefinitionList) {
Set<Long> sourceWorkFlowCodes = new HashSet<>();
+ if (taskDefinitionList == null || taskDefinitionList.isEmpty()) {
+ return sourceWorkFlowCodes;
+ }
List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList);
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (taskDefinitionLog.getProjectCode() == projectCode) {
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 94f2ad5..d8852b4 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -207,9 +207,10 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode,
version))
.thenReturn(new TaskDefinitionLog());
-
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setProjectCode(projectCode);
Mockito.when(taskDefinitionMapper.queryByCode(taskCode))
- .thenReturn(new TaskDefinition());
+ .thenReturn(taskDefinition);
Mockito.when(taskDefinitionMapper.updateById(new
TaskDefinitionLog())).thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.switchVersion(loginUser, projectCode, taskCode, version);
@@ -306,7 +307,16 @@ public class TaskDefinitionServiceImplTest {
// process definition offline
putMsg(result, Status.SUCCESS);
-
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(new
TaskDefinition());
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setProjectCode(projectCode);
+ taskDefinition.setVersion(1);
+ taskDefinition.setCode(taskCode);
+ String params =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
+ taskDefinition.setTaskParams(params);
+ taskDefinition.setTaskType(TaskType.SHELL.getDesc());
+
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
+ TaskDefinitionLog taskDefinitionLog = new
TaskDefinitionLog(taskDefinition);
+
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode,
taskDefinition.getVersion())).thenReturn(taskDefinitionLog);
Map<String, Object> offlineTaskResult =
taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode,
ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS,
offlineTaskResult.get(Constants.STATUS));
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 2dcfd2d..df7549c 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -126,9 +126,9 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
-import com.facebook.presto.jdbc.internal.guava.collect.Lists;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
/**
* process relative dao that some mappers in this.
@@ -330,7 +330,7 @@ public class ProcessService {
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(defineCode);
if (processDefinition == null) {
logger.error("process define not exists");
- return new ArrayList<>();
+ return Lists.newArrayList();
}
List<ProcessTaskRelationLog> processTaskRelations =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(),
processDefinition.getVersion());
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
@@ -339,8 +339,11 @@ public class ProcessService {
taskDefinitionSet.add(new
TaskDefinition(processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion()));
}
}
+ if (taskDefinitionSet.isEmpty()) {
+ return Lists.newArrayList();
+ }
List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
- return new ArrayList<>(taskDefinitionLogs);
+ return Lists.newArrayList(taskDefinitionLogs);
}
/**
@@ -2367,6 +2370,9 @@ public class ProcessService {
taskDefinitionSet.add(new
TaskDefinition(processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion()));
}
}
+ if (taskDefinitionSet.isEmpty()) {
+ return Lists.newArrayList();
+ }
return
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}