This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 689828011 [Improve][ST-Engine] Improve job restart of all node down 
(#3784)
689828011 is described below

commit 6898280110e95db3efa811dfab105ec943b4e599
Author: ic4y <[email protected]>
AuthorDate: Tue Dec 27 18:16:35 2022 +0800

    [Improve][ST-Engine] Improve job restart of all node down (#3784)
    
    * [hotfix][ST-Engine] fix job restart of all node down
    
    * fix ClusterFaultToleranceIT
    
    * fix checkStyle
---
 .../engine/server/dag/physical/PhysicalVertex.java | 35 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d6bc9f5f5..87143874f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -35,6 +35,7 @@ import 
org.apache.seatunnel.engine.server.task.operation.CheckTaskGroupIsExecuti
 import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
 
 import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Member;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -44,12 +45,14 @@ import 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
 
 import java.net.URL;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * PhysicalVertex is responsible for the scheduling and execution of a single 
task parallel
@@ -181,7 +184,6 @@ public class PhysicalVertex {
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
     }
 
-    @SuppressWarnings("checkstyle:MagicNumber")
     public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
         this.taskFuture = new CompletableFuture<>();
         ExecutionState executionState = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
@@ -194,13 +196,13 @@ public class PhysicalVertex {
         // Because the state may be RUNNING when the cluster is restarted, but 
the Task no longer exists.
         if (ExecutionState.RUNNING.equals(executionState)){
             if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
+                updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
                 this.taskFuture.complete(new 
TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, null));
             }
         }
         // If the task state is CANCELING we need call 
noticeTaskExecutionServiceCancel().
         else if (ExecutionState.CANCELING.equals(executionState)) {
-            noticeTaskExecutionServiceCancel(3);
-            this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, 
ExecutionState.CANCELED, null));
+            noticeTaskExecutionServiceCancel();
         } else if (executionState.isEndState()) {
             this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, 
executionState, null));
         }
@@ -212,6 +214,12 @@ public class PhysicalVertex {
         SlotProfile slotProfile = 
getOwnedSlotProfilesByTaskGroup(taskGroupLocation, ownedSlotProfilesIMap);
         if (null != slotProfile){
             Address worker = slotProfile.getWorker();
+            List<Address> members = 
nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress)
+                .collect(Collectors.toList());
+            if (!members.contains(worker)){
+                LOGGER.warning("The node:" + worker.toString() + " running the 
taskGroup no longer exists, return false.");
+                return false;
+            }
             InvocationFuture<Object> invoke = 
nodeEngine.getOperationService().createInvocationBuilder(
                 SeaTunnelServer.SERVICE_NAME,
                 new CheckTaskGroupIsExecutingOperation(taskGroupLocation),
@@ -374,16 +382,20 @@ public class PhysicalVertex {
             updateTaskState(ExecutionState.DEPLOYING, 
ExecutionState.CANCELED)) {
             taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.CANCELED, null));
         } else if (updateTaskState(ExecutionState.RUNNING, 
ExecutionState.CANCELING)) {
-            noticeTaskExecutionServiceCancel(Integer.MAX_VALUE);
+            noticeTaskExecutionServiceCancel();
         }
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    private void noticeTaskExecutionServiceCancel(int tryTimes) {
+    private void noticeTaskExecutionServiceCancel() {
+        //Check whether the node exists, and whether the Task on the node 
exists. If there is no direct update state
+        if (!checkTaskGroupIsExecuting(taskGroupLocation)){
+            updateTaskState(ExecutionState.CANCELING, ExecutionState.CANCELED);
+            taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.CANCELED, null));
+        }
         int i = 0;
         // In order not to generate uncontrolled tasks, We will try again 
until the taskFuture is completed
-        // If the cluster restart causes the number of nodes to change, it is 
meaningless to keep retrying
-        while (!taskFuture.isDone() && 
nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null 
&& i < tryTimes) {
+        while (!taskFuture.isDone() && 
nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null) 
{
             try {
                 i++;
                 LOGGER.info(
@@ -419,6 +431,15 @@ public class PhysicalVertex {
 
     private void resetExecutionState() {
         synchronized (this) {
+            ExecutionState executionState = getExecutionState();
+            if (!executionState.isEndState()) {
+                String message =
+                    String.format("%s reset state failed, only end state can 
be reset, current is %s",
+                        getTaskFullName(),
+                        executionState);
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
             updateStateTimestamps(ExecutionState.CREATED);
             runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED);
         }

Reply via email to