This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 961320039 [Improve][Zeta] Fix engine runtime error (#4850)
961320039 is described below

commit 9613200396f5297a9066778b53c19d17c281e7f5
Author: Jia Fan <[email protected]>
AuthorDate: Tue May 30 15:54:02 2023 +0800

    [Improve][Zeta] Fix engine runtime error (#4850)
---
 .../engine/client/util/ContentFormatUtil.java      | 11 ++--
 .../engine/server/dag/physical/PhysicalVertex.java |  2 +
 .../engine/server/dag/physical/SubPlan.java        | 72 ++++++++++++----------
 3 files changed, 48 insertions(+), 37 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
index de03023e7..4982215ca 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
@@ -36,9 +36,12 @@ public class ContentFormatUtil {
         for (JobStatusData jobStatusData : jobStatusDataList) {
             maxJobIdLength =
                     Math.max(maxJobIdLength, 
String.valueOf(jobStatusData.getJobId()).length());
-            maxJobNameLength = Math.max(maxJobNameLength, 
jobStatusData.getJobName().length());
+            maxJobNameLength =
+                    Math.max(maxJobNameLength, 
String.valueOf(jobStatusData.getJobName()).length());
             maxJobStatusLength =
-                    Math.max(maxJobStatusLength, 
jobStatusData.getJobStatus().toString().length());
+                    Math.max(
+                            maxJobStatusLength,
+                            
String.valueOf(jobStatusData.getJobStatus()).length());
         }
 
         String formatStr =
@@ -76,9 +79,7 @@ public class ContentFormatUtil {
             String jobName =
                     String.format("%-" + maxJobNameLength + "s", 
jobStatusData.getJobName());
             String jobStatus =
-                    String.format(
-                            "%-" + maxJobStatusLength + "s",
-                            jobStatusData.getJobStatus().toString());
+                    String.format("%-" + maxJobStatusLength + "s", 
jobStatusData.getJobStatus());
             String submitTime =
                     String.format(
                             "%-" + maxSubmitTimeLength + "s",
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 07c19353b..a95e7e8fd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -418,6 +418,8 @@ public class PhysicalVertex {
                     new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.CANCELED));
         } else if (updateTaskState(ExecutionState.RUNNING, 
ExecutionState.CANCELING)) {
             noticeTaskExecutionServiceCancel();
+        } else if 
(ExecutionState.CANCELING.equals(runningJobStateIMap.get(taskGroupLocation))) {
+            noticeTaskExecutionServiceCancel();
         }
 
         LOGGER.info(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index a6cb3e6eb..1540db61f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -18,8 +18,6 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.common.utils.RetryUtils;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
@@ -30,15 +28,17 @@ import 
org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.map.IMap;
 import lombok.Data;
 import lombok.NonNull;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -355,32 +355,39 @@ public class SubPlan {
     }
 
     public synchronized void cancelPipeline() {
-        try {
-            RetryUtils.retryWithException(
-                    () -> {
-                        if (getPipelineState().isEndState()) {
-                            LOGGER.warning(
-                                    String.format(
-                                            "%s is in end state %s, can not be 
cancel",
-                                            pipelineFullName, 
getPipelineState()));
-                            return null;
-                        }
-                        // If an active Master Node done and another Master 
Node active, we can not
-                        // know whether
-                        // canceled pipeline
-                        // complete. So we need cancel running pipeline again.
-                        if (!PipelineStatus.CANCELING.equals(
-                                runningJobStateIMap.get(pipelineLocation))) {
-                            updatePipelineState(getPipelineState(), 
PipelineStatus.CANCELING);
-                        }
-                        cancelCheckpointCoordinator();
-                        cancelPipelineTasks();
-                        return null;
-                    },
-                    new RetryUtils.RetryMaterial(
-                            30, true, e -> e instanceof IOException, 1000, 
true));
-        } catch (Exception e) {
-            throw new SeaTunnelEngineException(e);
+        for (int i = 0; i < 10; i++) {
+            try {
+                LOGGER.warning("start cancel job " + pipelineFullName + " 
count = " + i);
+                if (getPipelineState().isEndState()) {
+                    LOGGER.warning(
+                            String.format(
+                                    "%s is in end state %s, can not be cancel",
+                                    pipelineFullName, getPipelineState()));
+                }
+                // If an active Master Node done and another Master Node 
active, we can
+                // not know whether canceled pipeline complete.
+                // So we need cancel running pipeline again.
+                if 
(!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) {
+                    updatePipelineState(getPipelineState(), 
PipelineStatus.CANCELING);
+                }
+                cancelCheckpointCoordinator();
+                Optional<Exception> optionalException = cancelPipelineTasks();
+                if (optionalException.isPresent()) {
+                    throw optionalException.get();
+                }
+                break;
+            } catch (OperationTimeoutException
+                    | HazelcastInstanceNotActiveException
+                    | InterruptedException e) {
+                try {
+                    Thread.sleep(2000);
+                } catch (Exception ignored) {
+                }
+                LOGGER.warning(String.format("%s cancel error will retry", 
pipelineFullName), e);
+            } catch (Throwable e) {
+                LOGGER.warning(String.format("%s cancel error", 
pipelineFullName), e);
+                break;
+            }
         }
     }
 
@@ -390,7 +397,7 @@ public class SubPlan {
         }
     }
 
-    private void cancelPipelineTasks() {
+    private Optional<Exception> cancelPipelineTasks() {
         List<CompletableFuture<Void>> coordinatorCancelList =
                 coordinatorVertexList.stream()
                         .map(this::cancelTask)
@@ -407,14 +414,15 @@ public class SubPlan {
             coordinatorCancelList.addAll(taskCancelList);
             CompletableFuture<Void> voidCompletableFuture =
                     CompletableFuture.allOf(
-                            coordinatorCancelList.toArray(
-                                    new 
CompletableFuture[coordinatorCancelList.size()]));
+                            coordinatorCancelList.toArray(new 
CompletableFuture[0]));
             voidCompletableFuture.get();
+            return Optional.empty();
         } catch (Exception e) {
             LOGGER.severe(
                     String.format(
                             "%s cancel error with exception: %s",
                             pipelineFullName, ExceptionUtils.getMessage(e)));
+            return Optional.of(e);
         }
     }
 

Reply via email to