This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 961320039 [Improve][Zeta] Fix engine runtime error (#4850)
961320039 is described below
commit 9613200396f5297a9066778b53c19d17c281e7f5
Author: Jia Fan <[email protected]>
AuthorDate: Tue May 30 15:54:02 2023 +0800
[Improve][Zeta] Fix engine runtime error (#4850)
---
.../engine/client/util/ContentFormatUtil.java | 11 ++--
.../engine/server/dag/physical/PhysicalVertex.java | 2 +
.../engine/server/dag/physical/SubPlan.java | 72 ++++++++++++----------
3 files changed, 48 insertions(+), 37 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
index de03023e7..4982215ca 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
@@ -36,9 +36,12 @@ public class ContentFormatUtil {
for (JobStatusData jobStatusData : jobStatusDataList) {
maxJobIdLength =
Math.max(maxJobIdLength,
String.valueOf(jobStatusData.getJobId()).length());
- maxJobNameLength = Math.max(maxJobNameLength,
jobStatusData.getJobName().length());
+ maxJobNameLength =
+ Math.max(maxJobNameLength,
String.valueOf(jobStatusData.getJobName()).length());
maxJobStatusLength =
- Math.max(maxJobStatusLength,
jobStatusData.getJobStatus().toString().length());
+ Math.max(
+ maxJobStatusLength,
+
String.valueOf(jobStatusData.getJobStatus()).length());
}
String formatStr =
@@ -76,9 +79,7 @@ public class ContentFormatUtil {
String jobName =
String.format("%-" + maxJobNameLength + "s",
jobStatusData.getJobName());
String jobStatus =
- String.format(
- "%-" + maxJobStatusLength + "s",
- jobStatusData.getJobStatus().toString());
+ String.format("%-" + maxJobStatusLength + "s",
jobStatusData.getJobStatus());
String submitTime =
String.format(
"%-" + maxSubmitTimeLength + "s",
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 07c19353b..a95e7e8fd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -418,6 +418,8 @@ public class PhysicalVertex {
new TaskExecutionState(this.taskGroupLocation,
ExecutionState.CANCELED));
} else if (updateTaskState(ExecutionState.RUNNING,
ExecutionState.CANCELING)) {
noticeTaskExecutionServiceCancel();
+ } else if
(ExecutionState.CANCELING.equals(runningJobStateIMap.get(taskGroupLocation))) {
+ noticeTaskExecutionServiceCancel();
}
LOGGER.info(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index a6cb3e6eb..1540db61f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -18,8 +18,6 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.common.utils.RetryUtils;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
@@ -30,15 +28,17 @@ import
org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.master.JobMaster;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
import lombok.Data;
import lombok.NonNull;
-import java.io.IOException;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -355,32 +355,39 @@ public class SubPlan {
}
public synchronized void cancelPipeline() {
- try {
- RetryUtils.retryWithException(
- () -> {
- if (getPipelineState().isEndState()) {
- LOGGER.warning(
- String.format(
- "%s is in end state %s, can not be
cancel",
- pipelineFullName,
getPipelineState()));
- return null;
- }
- // If an active Master Node done and another Master
Node active, we can not
- // know whether
- // canceled pipeline
- // complete. So we need cancel running pipeline again.
- if (!PipelineStatus.CANCELING.equals(
- runningJobStateIMap.get(pipelineLocation))) {
- updatePipelineState(getPipelineState(),
PipelineStatus.CANCELING);
- }
- cancelCheckpointCoordinator();
- cancelPipelineTasks();
- return null;
- },
- new RetryUtils.RetryMaterial(
- 30, true, e -> e instanceof IOException, 1000,
true));
- } catch (Exception e) {
- throw new SeaTunnelEngineException(e);
+ for (int i = 0; i < 10; i++) {
+ try {
+ LOGGER.warning("start cancel job " + pipelineFullName + "
count = " + i);
+ if (getPipelineState().isEndState()) {
+ LOGGER.warning(
+ String.format(
+ "%s is in end state %s, can not be cancel",
+ pipelineFullName, getPipelineState()));
+ }
+ // If an active Master Node done and another Master Node
active, we can
+ // not know whether canceled pipeline complete.
+ // So we need cancel running pipeline again.
+ if
(!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) {
+ updatePipelineState(getPipelineState(),
PipelineStatus.CANCELING);
+ }
+ cancelCheckpointCoordinator();
+ Optional<Exception> optionalException = cancelPipelineTasks();
+ if (optionalException.isPresent()) {
+ throw optionalException.get();
+ }
+ break;
+ } catch (OperationTimeoutException
+ | HazelcastInstanceNotActiveException
+ | InterruptedException e) {
+ try {
+ Thread.sleep(2000);
+ } catch (Exception ignored) {
+ }
+ LOGGER.warning(String.format("%s cancel error will retry",
pipelineFullName), e);
+ } catch (Throwable e) {
+ LOGGER.warning(String.format("%s cancel error",
pipelineFullName), e);
+ break;
+ }
}
}
@@ -390,7 +397,7 @@ public class SubPlan {
}
}
- private void cancelPipelineTasks() {
+ private Optional<Exception> cancelPipelineTasks() {
List<CompletableFuture<Void>> coordinatorCancelList =
coordinatorVertexList.stream()
.map(this::cancelTask)
@@ -407,14 +414,15 @@ public class SubPlan {
coordinatorCancelList.addAll(taskCancelList);
CompletableFuture<Void> voidCompletableFuture =
CompletableFuture.allOf(
- coordinatorCancelList.toArray(
- new
CompletableFuture[coordinatorCancelList.size()]));
+ coordinatorCancelList.toArray(new
CompletableFuture[0]));
voidCompletableFuture.get();
+ return Optional.empty();
} catch (Exception e) {
LOGGER.severe(
String.format(
"%s cancel error with exception: %s",
pipelineFullName, ExceptionUtils.getMessage(e)));
+ return Optional.of(e);
}
}