This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c687a7adb9 [Fix][Zeta] Fix premature closure of the checkpoint thread
pool causes abnormal task status (#9228)
c687a7adb9 is described below
commit c687a7adb998af6561bd9f99252ce6fd0d7d8f9b
Author: Jia Fan <[email protected]>
AuthorDate: Sun Apr 27 21:29:37 2025 +0800
[Fix][Zeta] Fix premature closure of the checkpoint thread pool causes
abnormal task status (#9228)
Co-authored-by: Copilot <[email protected]>
---
.../server/checkpoint/CheckpointCoordinator.java | 5 ++-
.../engine/server/dag/physical/PhysicalVertex.java | 7 ++--
.../checkpoint/CheckpointCoordinatorTest.java | 49 ++++++++++++++++++++++
3 files changed, 57 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index b221b00651..c34b69954c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -285,12 +285,15 @@ public class CheckpointCoordinator {
if (checkpointCoordinatorFuture.isDone()) {
return;
}
- cleanPendingCheckpoint(reason);
updateStatus(CheckpointCoordinatorStatus.FAILED);
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
CheckpointCoordinatorStatus.FAILED,
errorByPhysicalVertex.get()));
checkpointManager.handleCheckpointError(pipelineId, false);
+ // we should wait the checkpoint manager handle the error to cancel
other task by use
+ // checkpoint coordinator thread pool. So we killed the thread pool at
the end of this
+ // method to avoid the thread be interrupted before handle checkpoint
error finished.
+ cleanPendingCheckpoint(reason);
}
private void restoreTaskState(TaskLocation taskLocation) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 435cdc1e3c..368e0ba5ce 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -238,9 +238,10 @@ public class PhysicalVertex {
try {
return (Boolean) invoke.get();
} catch (InterruptedException | ExecutionException e) {
- log.warn(
- "Execution of CheckTaskGroupIsExecutingOperation {}
failed, checkTaskGroupIsExecuting return false. ",
- taskGroupLocation,
+ log.error(
+ String.format(
+ "Execution of
CheckTaskGroupIsExecutingOperation %s failed, checkTaskGroupIsExecuting return
false. ",
+ taskGroupLocation),
e);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
index e43fdfa162..3a62cc6e64 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
@@ -20,17 +20,25 @@ package org.apache.seatunnel.engine.server.checkpoint;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static
org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
@@ -60,4 +68,45 @@ public class CheckpointCoordinatorTest
999, System.currentTimeMillis(),
CheckpointType.CHECKPOINT_TYPE),
new ArrayList<>()));
}
+
+ @Test
+ void testSchedulerThreadShouldNotBeInterruptedBeforeJobMasterCleaned()
+ throws CheckpointStorageException, ExecutionException,
InterruptedException,
+ TimeoutException {
+ CheckpointConfig checkpointConfig = new CheckpointConfig();
+ // quickly fail the checkpoint
+ checkpointConfig.setCheckpointTimeout(5000);
+ checkpointConfig.setStorage(new CheckpointStorageConfig());
+ Map<Integer, CheckpointPlan> planMap = new HashMap<>();
+ planMap.put(
+ 1,
+ CheckpointPlan.builder()
+ .pipelineId(1)
+ .pipelineSubtasks(Collections.singleton(new
TaskLocation()))
+ .build());
+ CompletableFuture<Boolean> threadIsInterrupted = new
CompletableFuture<>();
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ CheckpointManager checkpointManager =
+ new CheckpointManager(
+ 1L,
+ false,
+ nodeEngine,
+ null,
+ planMap,
+ checkpointConfig,
+ executorService,
+
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
+
+ @Override
+ protected void handleCheckpointError(int pipelineId,
boolean neverRestore) {
+ threadIsInterrupted.complete(Thread.interrupted());
+ }
+ };
+ checkpointManager.reportedPipelineRunning(1, true);
+ Assertions.assertFalse(threadIsInterrupted.get(1,
TimeUnit.MINUTES));
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
}