This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b23c2875b [Improve] [SeaTunnel-Engine] Cancel CheckpointCoordinator
First Before Cancel Task (#3838)
b23c2875b is described below
commit b23c2875bfd9ea0e32783f54c5fbd932efb85a9c
Author: Hisoka <[email protected]>
AuthorDate: Fri Dec 30 20:49:17 2022 +0800
[Improve] [SeaTunnel-Engine] Cancel CheckpointCoordinator First Before
Cancel Task (#3838)
* [Improve] [SeaTunnel-Engine] Cancel CheckpointCoordinator First Before
Cancel Task
---
.../engine/server/TaskExecutionService.java | 4 +---
.../engine/server/dag/physical/PhysicalVertex.java | 23 +++++++++++-----------
.../engine/server/dag/physical/SubPlan.java | 5 +++++
3 files changed, 17 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index b2c527760..2b3aff44b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -268,8 +268,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
invoke.get();
notifyStateSuccess = true;
} catch (InterruptedException e) {
- logger.severe(e);
- Thread.interrupted();
+ logger.severe("send notify task status failed", e);
} catch (ExecutionException e) {
logger.warning(ExceptionUtils.getMessage(e));
logger.warning(String.format("notify the job of the task(%s)
status failed, retry in %s millis",
@@ -278,7 +277,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
logger.severe(e);
- Thread.interrupted();
}
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 49744bbd3..baf53dd2e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -306,13 +306,6 @@ public class PhysicalVertex {
private boolean turnToEndState(@NonNull ExecutionState endState) {
synchronized (this) {
- // consistency check
- ExecutionState currentState = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
- if (currentState.isEndState()) {
- String message = String.format("Task %s is already in terminal
state %s", taskFullName, currentState);
- LOGGER.warning(message);
- return false;
- }
if (!endState.isEndState()) {
String message =
String.format("Turn task %s state to end state need gave a
end state, not %s", taskFullName,
@@ -320,6 +313,16 @@ public class PhysicalVertex {
LOGGER.warning(message);
return false;
}
+ // consistency check
+ ExecutionState currentState = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
+ if (currentState.equals(endState)) {
+ return true;
+ }
+ if (currentState.isEndState()) {
+ String message = String.format("Task %s is already in terminal
state %s", taskFullName, currentState);
+ LOGGER.warning(message);
+ return false;
+ }
updateStateTimestamps(endState);
runningJobStateIMap.set(taskGroupLocation, endState);
@@ -372,10 +375,6 @@ public class PhysicalVertex {
}
}
- public TaskGroupDefaultImpl getTaskGroup() {
- return taskGroup;
- }
-
public void cancel() {
if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED)
||
@@ -392,6 +391,7 @@ public class PhysicalVertex {
if (!checkTaskGroupIsExecuting(taskGroupLocation)){
updateTaskState(ExecutionState.CANCELING, ExecutionState.CANCELED);
taskFuture.complete(new TaskExecutionState(this.taskGroupLocation,
ExecutionState.CANCELED, null));
+ return;
}
int i = 0;
// In order not to generate uncontrolled tasks, We will try again
until the taskFuture is completed
@@ -415,7 +415,6 @@ public class PhysicalVertex {
}
}
}
- this.taskFuture.complete(new TaskExecutionState(taskGroupLocation,
ExecutionState.CANCELED, null));
}
private void updateStateTimestamps(@NonNull ExecutionState targetState) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index b1fc23b3b..aa049cfda 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -247,9 +247,14 @@ public class SubPlan {
if
(!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) {
updatePipelineState(getPipelineState(), PipelineStatus.CANCELING);
}
+ cancelCheckpointCoordinator();
cancelPipelineTasks();
}
+ private void cancelCheckpointCoordinator() {
+ jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId,
PipelineStatus.CANCELING).join();
+ }
+
private void cancelPipelineTasks() {
List<CompletableFuture<Void>> coordinatorCancelList =
coordinatorVertexList.stream().map(this::cancelTask).filter(Objects::nonNull)