This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev_jobrestart
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit c13d2704f14bd8510f6042c2c7cdeae0847aa2c3
Author: liuli <[email protected]>
AuthorDate: Fri Dec 16 19:53:19 2022 +0800

    [hotfix][ST-Engine] fix job restart of all node down
---
 .../engine/server/dag/physical/PhysicalVertex.java | 26 ++++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d6bc9f5f5..bb0675e70 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -181,7 +181,6 @@ public class PhysicalVertex {
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
     }
 
-    @SuppressWarnings("checkstyle:MagicNumber")
     public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
         this.taskFuture = new CompletableFuture<>();
         ExecutionState executionState = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
@@ -194,13 +193,13 @@ public class PhysicalVertex {
         // Because the state may be RUNNING when the cluster is restarted, but 
the Task no longer exists.
         if (ExecutionState.RUNNING.equals(executionState)){
             if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
+                updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
                 this.taskFuture.complete(new 
TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, null));
             }
         }
         // If the task state is CANCELING we need call 
noticeTaskExecutionServiceCancel().
         else if (ExecutionState.CANCELING.equals(executionState)) {
-            noticeTaskExecutionServiceCancel(3);
-            this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, 
ExecutionState.CANCELED, null));
+            noticeTaskExecutionServiceCancel();
         } else if (executionState.isEndState()) {
             this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, 
executionState, null));
         }
@@ -374,16 +373,20 @@ public class PhysicalVertex {
             updateTaskState(ExecutionState.DEPLOYING, 
ExecutionState.CANCELED)) {
             taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.CANCELED, null));
         } else if (updateTaskState(ExecutionState.RUNNING, 
ExecutionState.CANCELING)) {
-            noticeTaskExecutionServiceCancel(Integer.MAX_VALUE);
+            noticeTaskExecutionServiceCancel();
         }
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    private void noticeTaskExecutionServiceCancel(int tryTimes) {
+    private void noticeTaskExecutionServiceCancel() {
+        //Check whether the node exists, and whether the Task on the node 
exists. If there is no direct update state
+        if (!checkTaskGroupIsExecuting(taskGroupLocation)){
+            updateTaskState(ExecutionState.CANCELING, ExecutionState.CANCELED);
+            taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.CANCELED, null));
+        }
         int i = 0;
         // In order not to generate uncontrolled tasks, We will try again 
until the taskFuture is completed
-        // If the cluster restart causes the number of nodes to change, it is 
meaningless to keep retrying
-        while (!taskFuture.isDone() && 
nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null 
&& i < tryTimes) {
+        while (!taskFuture.isDone() && 
nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null) 
{
             try {
                 i++;
                 LOGGER.info(
@@ -419,6 +422,15 @@ public class PhysicalVertex {
 
     private void resetExecutionState() {
         synchronized (this) {
+            ExecutionState executionState = getExecutionState();
+            if (!executionState.isEndState()) {
+                String message =
+                    String.format("%s reset state failed, only end state can 
be reset, current is %s",
+                        getTaskFullName(),
+                        executionState);
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
             updateStateTimestamps(ExecutionState.CREATED);
             runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED);
         }

Reply via email to