This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b23c2875b [Improve] [SeaTunnel-Engine] Cancel CheckpointCoordinator 
First Before Cancel Task (#3838)
b23c2875b is described below

commit b23c2875bfd9ea0e32783f54c5fbd932efb85a9c
Author: Hisoka <[email protected]>
AuthorDate: Fri Dec 30 20:49:17 2022 +0800

    [Improve] [SeaTunnel-Engine] Cancel CheckpointCoordinator First Before 
Cancel Task (#3838)
    
    * [Improve] [SeaTunnel-Engine] Cancel CheckpointCoordinator First Before 
Cancel Task
---
 .../engine/server/TaskExecutionService.java        |  4 +---
 .../engine/server/dag/physical/PhysicalVertex.java | 23 +++++++++++-----------
 .../engine/server/dag/physical/SubPlan.java        |  5 +++++
 3 files changed, 17 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index b2c527760..2b3aff44b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -268,8 +268,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                 invoke.get();
                 notifyStateSuccess = true;
             } catch (InterruptedException e) {
-                logger.severe(e);
-                Thread.interrupted();
+                logger.severe("send notify task status failed", e);
             } catch (ExecutionException e) {
                 logger.warning(ExceptionUtils.getMessage(e));
                 logger.warning(String.format("notify the job of the task(%s) 
status failed, retry in %s millis",
@@ -278,7 +277,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                     Thread.sleep(sleepTime);
                 } catch (InterruptedException ex) {
                     logger.severe(e);
-                    Thread.interrupted();
                 }
             }
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 49744bbd3..baf53dd2e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -306,13 +306,6 @@ public class PhysicalVertex {
 
     private boolean turnToEndState(@NonNull ExecutionState endState) {
         synchronized (this) {
-            // consistency check
-            ExecutionState currentState = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
-            if (currentState.isEndState()) {
-                String message = String.format("Task %s is already in terminal 
state %s", taskFullName, currentState);
-                LOGGER.warning(message);
-                return false;
-            }
             if (!endState.isEndState()) {
                 String message =
                     String.format("Turn task %s state to end state need gave a 
end state, not %s", taskFullName,
@@ -320,6 +313,16 @@ public class PhysicalVertex {
                 LOGGER.warning(message);
                 return false;
             }
+            // consistency check
+            ExecutionState currentState = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
+            if (currentState.equals(endState)) {
+                return true;
+            }
+            if (currentState.isEndState()) {
+                String message = String.format("Task %s is already in terminal 
state %s", taskFullName, currentState);
+                LOGGER.warning(message);
+                return false;
+            }
 
             updateStateTimestamps(endState);
             runningJobStateIMap.set(taskGroupLocation, endState);
@@ -372,10 +375,6 @@ public class PhysicalVertex {
         }
     }
 
-    public TaskGroupDefaultImpl getTaskGroup() {
-        return taskGroup;
-    }
-
     public void cancel() {
         if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
             updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED) 
||
@@ -392,6 +391,7 @@ public class PhysicalVertex {
         if (!checkTaskGroupIsExecuting(taskGroupLocation)){
             updateTaskState(ExecutionState.CANCELING, ExecutionState.CANCELED);
             taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.CANCELED, null));
+            return;
         }
         int i = 0;
         // In order not to generate uncontrolled tasks, We will try again 
until the taskFuture is completed
@@ -415,7 +415,6 @@ public class PhysicalVertex {
                 }
             }
         }
-        this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, 
ExecutionState.CANCELED, null));
     }
 
     private void updateStateTimestamps(@NonNull ExecutionState targetState) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index b1fc23b3b..aa049cfda 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -247,9 +247,14 @@ public class SubPlan {
         if 
(!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) {
             updatePipelineState(getPipelineState(), PipelineStatus.CANCELING);
         }
+        cancelCheckpointCoordinator();
         cancelPipelineTasks();
     }
 
+    private void cancelCheckpointCoordinator() {
+        jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId, 
PipelineStatus.CANCELING).join();
+    }
+
     private void cancelPipelineTasks() {
         List<CompletableFuture<Void>> coordinatorCancelList =
             
coordinatorVertexList.stream().map(this::cancelTask).filter(Objects::nonNull)

Reply via email to