This is an automated email from the ASF dual-hosted git repository. ic4y pushed a commit to branch dev_jobrestart in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit c13d2704f14bd8510f6042c2c7cdeae0847aa2c3 Author: liuli <[email protected]> AuthorDate: Fri Dec 16 19:53:19 2022 +0800 [hotfix][ST-Engine] fix job restart of all node down --- .../engine/server/dag/physical/PhysicalVertex.java | 26 ++++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java index d6bc9f5f5..bb0675e70 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java @@ -181,7 +181,6 @@ public class PhysicalVertex { this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap; } - @SuppressWarnings("checkstyle:MagicNumber") public PassiveCompletableFuture<TaskExecutionState> initStateFuture() { this.taskFuture = new CompletableFuture<>(); ExecutionState executionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation); @@ -194,13 +193,13 @@ public class PhysicalVertex { // Because the state may be RUNNING when the cluster is restarted, but the Task no longer exists. if (ExecutionState.RUNNING.equals(executionState)){ if (!checkTaskGroupIsExecuting(taskGroupLocation)) { + updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED); this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, null)); } } // If the task state is CANCELING we need call noticeTaskExecutionServiceCancel(). else if (ExecutionState.CANCELING.equals(executionState)) { - noticeTaskExecutionServiceCancel(3); - this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.CANCELED, null)); + noticeTaskExecutionServiceCancel(); } else if (executionState.isEndState()) { this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, executionState, null)); } @@ -374,16 +373,20 @@ public class PhysicalVertex { updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELED)) { taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null)); } else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) { - noticeTaskExecutionServiceCancel(Integer.MAX_VALUE); + noticeTaskExecutionServiceCancel(); } } @SuppressWarnings("checkstyle:MagicNumber") - private void noticeTaskExecutionServiceCancel(int tryTimes) { + private void noticeTaskExecutionServiceCancel() { + //Check whether the node exists, and whether the Task on the node exists. If there is no direct update state + if (!checkTaskGroupIsExecuting(taskGroupLocation)){ + updateTaskState(ExecutionState.CANCELING, ExecutionState.CANCELED); + taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null)); + } int i = 0; // In order not to generate uncontrolled tasks, We will try again until the taskFuture is completed - // If the cluster restart causes the number of nodes to change, it is meaningless to keep retrying - while (!taskFuture.isDone() && nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null && i < tryTimes) { + while (!taskFuture.isDone() && nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null) { try { i++; LOGGER.info( @@ -419,6 +422,15 @@ public class PhysicalVertex { private void resetExecutionState() { synchronized (this) { + ExecutionState executionState = getExecutionState(); + if (!executionState.isEndState()) { + String message = + String.format("%s reset state failed, only end state can be reset, current is %s", + getTaskFullName(), + executionState); + LOGGER.severe(message); + throw new IllegalStateException(message); + } updateStateTimestamps(ExecutionState.CREATED); runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED); }
