This is an automated email from the ASF dual-hosted git repository. zhongjiajie pushed a commit to branch 3.0.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 072672a12771b77d69e3bd2247395ad6fca6d3f6 Author: Wenjun Ruan <[email protected]> AuthorDate: Tue Aug 2 09:30:18 2022 +0800 Fix recovery from failed task will dead loop (#11239) (cherry picked from commit 04f3aa97135d79469daf7c21c935029faff827b2) --- .../server/master/runner/WorkflowExecuteRunnable.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 9413707dcd..005a41d26e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -82,7 +82,6 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; @@ -1820,12 +1819,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { // todo: Can we use a better way to set the recover taskInstanceId list? rather then use the cmdParam if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) { - String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA); - if (ArrayUtils.isNotEmpty(idList)) { - List<Integer> taskInstanceIds = Arrays.stream(idList) - .map(Integer::valueOf) - .collect(Collectors.toList()); - return processService.findTaskInstanceByIdList(taskInstanceIds); + List<Integer> startTaskInstanceIds = Arrays.stream(paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING) + .split(COMMA)) + .filter(StringUtils::isNotEmpty) + .map(Integer::valueOf) + .collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) { + return processService.findTaskInstanceByIdList(startTaskInstanceIds); } } return Collections.emptyList();
