This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c687a7adb9 [Fix][Zeta] Fix premature closure of the checkpoint thread 
pool causes abnormal task status (#9228)
c687a7adb9 is described below

commit c687a7adb998af6561bd9f99252ce6fd0d7d8f9b
Author: Jia Fan <[email protected]>
AuthorDate: Sun Apr 27 21:29:37 2025 +0800

    [Fix][Zeta] Fix premature closure of the checkpoint thread pool causes 
abnormal task status (#9228)
    
    Co-authored-by: Copilot <[email protected]>
---
 .../server/checkpoint/CheckpointCoordinator.java   |  5 ++-
 .../engine/server/dag/physical/PhysicalVertex.java |  7 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 49 ++++++++++++++++++++++
 3 files changed, 57 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index b221b00651..c34b69954c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -285,12 +285,15 @@ public class CheckpointCoordinator {
         if (checkpointCoordinatorFuture.isDone()) {
             return;
         }
-        cleanPendingCheckpoint(reason);
         updateStatus(CheckpointCoordinatorStatus.FAILED);
         checkpointCoordinatorFuture.complete(
                 new CheckpointCoordinatorState(
                         CheckpointCoordinatorStatus.FAILED, 
errorByPhysicalVertex.get()));
         checkpointManager.handleCheckpointError(pipelineId, false);
+        // we should wait the checkpoint manager handle the error to cancel 
other task by use
+        // checkpoint coordinator thread pool. So we killed the thread pool at 
the end of this
+        // method to avoid the thread be interrupted before handle checkpoint 
error finished.
+        cleanPendingCheckpoint(reason);
     }
 
     private void restoreTaskState(TaskLocation taskLocation) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 435cdc1e3c..368e0ba5ce 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -238,9 +238,10 @@ public class PhysicalVertex {
             try {
                 return (Boolean) invoke.get();
             } catch (InterruptedException | ExecutionException e) {
-                log.warn(
-                        "Execution of CheckTaskGroupIsExecutingOperation {} 
failed, checkTaskGroupIsExecuting return false. ",
-                        taskGroupLocation,
+                log.error(
+                        String.format(
+                                "Execution of 
CheckTaskGroupIsExecutingOperation %s failed, checkTaskGroupIsExecuting return 
false. ",
+                                taskGroupLocation),
                         e);
             }
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
index e43fdfa162..3a62cc6e64 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
@@ -20,17 +20,25 @@ package org.apache.seatunnel.engine.server.checkpoint;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
 
@@ -60,4 +68,45 @@ public class CheckpointCoordinatorTest
                                 999, System.currentTimeMillis(), 
CheckpointType.CHECKPOINT_TYPE),
                         new ArrayList<>()));
     }
+
+    @Test
+    void testSchedulerThreadShouldNotBeInterruptedBeforeJobMasterCleaned()
+            throws CheckpointStorageException, ExecutionException, 
InterruptedException,
+                    TimeoutException {
+        CheckpointConfig checkpointConfig = new CheckpointConfig();
+        // quickly fail the checkpoint
+        checkpointConfig.setCheckpointTimeout(5000);
+        checkpointConfig.setStorage(new CheckpointStorageConfig());
+        Map<Integer, CheckpointPlan> planMap = new HashMap<>();
+        planMap.put(
+                1,
+                CheckpointPlan.builder()
+                        .pipelineId(1)
+                        .pipelineSubtasks(Collections.singleton(new 
TaskLocation()))
+                        .build());
+        CompletableFuture<Boolean> threadIsInterrupted = new 
CompletableFuture<>();
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        try {
+            CheckpointManager checkpointManager =
+                    new CheckpointManager(
+                            1L,
+                            false,
+                            nodeEngine,
+                            null,
+                            planMap,
+                            checkpointConfig,
+                            executorService,
+                            
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
+
+                        @Override
+                        protected void handleCheckpointError(int pipelineId, 
boolean neverRestore) {
+                            threadIsInterrupted.complete(Thread.interrupted());
+                        }
+                    };
+            checkpointManager.reportedPipelineRunning(1, true);
+            Assertions.assertFalse(threadIsInterrupted.get(1, 
TimeUnit.MINUTES));
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
 }

Reply via email to