This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 689828011 [Improve][ST-Engine] Improve job restart of all node down
(#3784)
689828011 is described below
commit 6898280110e95db3efa811dfab105ec943b4e599
Author: ic4y <[email protected]>
AuthorDate: Tue Dec 27 18:16:35 2022 +0800
[Improve][ST-Engine] Improve job restart of all node down (#3784)
* [hotfix][ST-Engine] fix job restart of all node down
* fix ClusterFaultToleranceIT
* fix checkStyle
---
.../engine/server/dag/physical/PhysicalVertex.java | 35 +++++++++++++++++-----
1 file changed, 28 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d6bc9f5f5..87143874f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -35,6 +35,7 @@ import
org.apache.seatunnel.engine.server.task.operation.CheckTaskGroupIsExecuti
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Member;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -44,12 +45,14 @@ import
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
import java.net.URL;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* PhysicalVertex is responsible for the scheduling and execution of a single
task parallel
@@ -181,7 +184,6 @@ public class PhysicalVertex {
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
- @SuppressWarnings("checkstyle:MagicNumber")
public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
this.taskFuture = new CompletableFuture<>();
ExecutionState executionState = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
@@ -194,13 +196,13 @@ public class PhysicalVertex {
// Because the state may be RUNNING when the cluster is restarted, but
the Task no longer exists.
if (ExecutionState.RUNNING.equals(executionState)){
if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
+ updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
this.taskFuture.complete(new
TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, null));
}
}
// If the task state is CANCELING we need call
noticeTaskExecutionServiceCancel().
else if (ExecutionState.CANCELING.equals(executionState)) {
- noticeTaskExecutionServiceCancel(3);
- this.taskFuture.complete(new TaskExecutionState(taskGroupLocation,
ExecutionState.CANCELED, null));
+ noticeTaskExecutionServiceCancel();
} else if (executionState.isEndState()) {
this.taskFuture.complete(new TaskExecutionState(taskGroupLocation,
executionState, null));
}
@@ -212,6 +214,12 @@ public class PhysicalVertex {
SlotProfile slotProfile =
getOwnedSlotProfilesByTaskGroup(taskGroupLocation, ownedSlotProfilesIMap);
if (null != slotProfile){
Address worker = slotProfile.getWorker();
+ List<Address> members =
nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress)
+ .collect(Collectors.toList());
+ if (!members.contains(worker)){
+ LOGGER.warning("The node:" + worker.toString() + " running the
taskGroup no longer exists, return false.");
+ return false;
+ }
InvocationFuture<Object> invoke =
nodeEngine.getOperationService().createInvocationBuilder(
SeaTunnelServer.SERVICE_NAME,
new CheckTaskGroupIsExecutingOperation(taskGroupLocation),
@@ -374,16 +382,20 @@ public class PhysicalVertex {
updateTaskState(ExecutionState.DEPLOYING,
ExecutionState.CANCELED)) {
taskFuture.complete(new TaskExecutionState(this.taskGroupLocation,
ExecutionState.CANCELED, null));
} else if (updateTaskState(ExecutionState.RUNNING,
ExecutionState.CANCELING)) {
- noticeTaskExecutionServiceCancel(Integer.MAX_VALUE);
+ noticeTaskExecutionServiceCancel();
}
}
@SuppressWarnings("checkstyle:MagicNumber")
- private void noticeTaskExecutionServiceCancel(int tryTimes) {
+ private void noticeTaskExecutionServiceCancel() {
+ //Check whether the node exists, and whether the Task on the node
exists. If there is no direct update state
+ if (!checkTaskGroupIsExecuting(taskGroupLocation)){
+ updateTaskState(ExecutionState.CANCELING, ExecutionState.CANCELED);
+ taskFuture.complete(new TaskExecutionState(this.taskGroupLocation,
ExecutionState.CANCELED, null));
+ }
int i = 0;
// In order not to generate uncontrolled tasks, We will try again
until the taskFuture is completed
- // If the cluster restart causes the number of nodes to change, it is
meaningless to keep retrying
- while (!taskFuture.isDone() &&
nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null
&& i < tryTimes) {
+ while (!taskFuture.isDone() &&
nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null)
{
try {
i++;
LOGGER.info(
@@ -419,6 +431,15 @@ public class PhysicalVertex {
private void resetExecutionState() {
synchronized (this) {
+ ExecutionState executionState = getExecutionState();
+ if (!executionState.isEndState()) {
+ String message =
+ String.format("%s reset state failed, only end state can
be reset, current is %s",
+ getTaskFullName(),
+ executionState);
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
updateStateTimestamps(ExecutionState.CREATED);
runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED);
}