This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 47becb96f [Bug] [Checkpoint] Fix Checkpoint Continue Trigger After Job
CANCELED. (#3808)
47becb96f is described below
commit 47becb96fa9fc8b4a5def6fc3a6db2ce34ea83df
Author: Hisoka <[email protected]>
AuthorDate: Wed Dec 28 15:18:46 2022 +0800
[Bug] [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED.
(#3808)
* [Bug] [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED.
---
.../engine/server/CoordinatorService.java | 1 -
...ilureReason.java => CheckpointCloseReason.java} | 5 +-
.../server/checkpoint/CheckpointCoordinator.java | 60 ++++++++++++++--------
.../server/checkpoint/CheckpointException.java | 22 ++++----
.../server/checkpoint/CheckpointManager.java | 7 ++-
.../server/checkpoint/PendingCheckpoint.java | 10 ++--
.../engine/server/dag/physical/PhysicalVertex.java | 1 +
.../engine/server/dag/physical/SubPlan.java | 3 +-
.../engine/server/master/JobMasterTest.java | 4 +-
9 files changed, 68 insertions(+), 45 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 1f7263e1d..086fa6783 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -263,7 +263,6 @@ public class CoordinatorService {
onJobDone(jobMaster, jobId);
}
});
- return;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
similarity index 90%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index ccd121f74..919500ca4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -17,17 +17,18 @@
package org.apache.seatunnel.engine.server.checkpoint;
-public enum CheckpointFailureReason {
+public enum CheckpointCloseReason {
PIPELINE_END("Pipeline turn to end state."),
CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
+ CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error.");
private final String message;
- CheckpointFailureReason(String message) {
+ CheckpointCloseReason(String message) {
this.message = message;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index abdde3482..8d275b657 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -46,7 +46,6 @@ import
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.Getter;
-import lombok.Setter;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,8 +122,6 @@ public class CheckpointCoordinator {
* Flag marking the coordinator as shut down (not accepting any messages
anymore).
*/
private volatile boolean shutdown;
-
- @Setter
private volatile boolean isAllTaskReady = false;
@SneakyThrows
@@ -184,17 +181,17 @@ public class CheckpointCoordinator {
break;
}
}).exceptionally(error -> {
- handleCoordinatorError("task running failed", error,
CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
+ handleCoordinatorError("task running failed", error,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
return null;
});
}
- private void handleCoordinatorError(String message, Throwable e,
CheckpointFailureReason reason) {
+ private void handleCoordinatorError(String message, Throwable e,
CheckpointCloseReason reason) {
LOG.error(message, e);
handleCoordinatorError(reason);
}
- private void handleCoordinatorError(CheckpointFailureReason reason) {
+ private void handleCoordinatorError(CheckpointCloseReason reason) {
cleanPendingCheckpoint(reason);
checkpointManager.handleCheckpointError(pipelineId, new
CheckpointException(reason));
}
@@ -259,10 +256,26 @@ public class CheckpointCoordinator {
}
}
+ protected void restoreCoordinator(boolean alreadyStarted) {
+
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
+ shutdown = false;
+ if (alreadyStarted) {
+ tryTriggerPendingCheckpoint();
+ isAllTaskReady = true;
+ } else {
+ isAllTaskReady = false;
+ }
+ }
+
protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
synchronized (lock) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("currentThread already be interrupted, skip trigger
checkpoint");
+ return;
+ }
if (isCompleted() || isShutdown()) {
- LOG.warn(String.format("can't trigger checkpoint with type:
%s, because checkpoint coordinator already have completed checkpoint",
checkpointType));
+ LOG.warn(String.format("can't trigger checkpoint with type:
%s, because checkpoint coordinator already have last completed checkpoint: (%s)
or shutdown (%b).",
+ checkpointType, latestCompletedCheckpoint != null ?
latestCompletedCheckpoint.getCheckpointType() : "null", shutdown));
return;
}
final long currentTimestamp = Instant.now().toEpochMilli();
@@ -322,18 +335,20 @@ public class CheckpointCoordinator {
}
private void
startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint>
pendingCompletableFuture) {
- pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
+ pendingCompletableFuture.thenAccept(pendingCheckpoint -> {
LOG.info("wait checkpoint completed: " +
pendingCheckpoint.getCheckpointId());
PassiveCompletableFuture<CompletedCheckpoint> completableFuture =
pendingCheckpoint.getCompletableFuture();
completableFuture.whenComplete((completedCheckpoint, error) -> {
if (error != null) {
- handleCoordinatorError("trigger checkpoint failed", error,
CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
- } else {
+ handleCoordinatorError("trigger checkpoint failed", error,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
+ } else if (completedCheckpoint != null) {
try {
completePendingCheckpoint(completedCheckpoint);
} catch (Throwable e) {
- handleCoordinatorError("complete checkpoint failed",
e, CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
+ handleCoordinatorError("complete checkpoint failed",
e, CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
}
+ } else {
+ LOG.info("skip this checkpoint cause by
completedCheckpoint is null");
}
});
@@ -359,7 +374,7 @@ public class CheckpointCoordinator {
// If any task is not acked within the checkpoint timeout
if
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null &&
!pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
-
handleCoordinatorError(CheckpointFailureReason.CHECKPOINT_EXPIRED);
+
handleCoordinatorError(CheckpointCloseReason.CHECKPOINT_EXPIRED);
}
}
}, coordinatorConfig.getCheckpointTimeout(),
@@ -438,13 +453,18 @@ public class CheckpointCoordinator {
.toArray(InvocationFuture[]::new);
}
- protected void cleanPendingCheckpoint(CheckpointFailureReason
failureReason) {
+ protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
+ shutdown = true;
+ isAllTaskReady = false;
synchronized (lock) {
- pendingCheckpoints.values().forEach(pendingCheckpoint ->
- pendingCheckpoint.abortCheckpoint(failureReason, null)
- );
- // TODO: clear related future & scheduler task
- pendingCheckpoints.clear();
+ LOG.info("start clean pending checkpoint cause {}",
closedReason.message());
+ if (!pendingCheckpoints.isEmpty()) {
+ pendingCheckpoints.values().forEach(pendingCheckpoint ->
+ pendingCheckpoint.abortCheckpoint(closedReason, null)
+ );
+ // TODO: clear related future & scheduler task
+ pendingCheckpoints.clear();
+ }
pendingCounter.set(0);
scheduler.shutdownNow();
scheduler = Executors.newScheduledThreadPool(
@@ -454,7 +474,6 @@ public class CheckpointCoordinator {
thread.setName(String.format("checkpoint-coordinator-%s/%s", pipelineId,
jobId));
return thread;
});
- isAllTaskReady = false;
}
}
@@ -527,8 +546,7 @@ public class CheckpointCoordinator {
// TODO: notifyCheckpointCompleted fail
latestCompletedCheckpoint = completedCheckpoint;
if (isCompleted()) {
- shutdown = true;
-
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_COORDINATOR_COMPLETED);
+
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
index 32832f900..a29e21df9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
@@ -25,30 +25,30 @@ public class CheckpointException extends Exception {
private static final long serialVersionUID = 3257526119022486948L;
- private final CheckpointFailureReason checkpointFailureReason;
+ private final CheckpointCloseReason checkpointCloseReason;
- public CheckpointException(CheckpointFailureReason failureReason) {
+ public CheckpointException(CheckpointCloseReason failureReason) {
super(failureReason.message());
- this.checkpointFailureReason = checkNotNull(failureReason);
+ this.checkpointCloseReason = checkNotNull(failureReason);
}
- public CheckpointException(String message, CheckpointFailureReason
failureReason) {
+ public CheckpointException(String message, CheckpointCloseReason
failureReason) {
super(message + " Failure reason: " + failureReason.message());
- this.checkpointFailureReason = checkNotNull(failureReason);
+ this.checkpointCloseReason = checkNotNull(failureReason);
}
- public CheckpointException(CheckpointFailureReason failureReason,
Throwable cause) {
+ public CheckpointException(CheckpointCloseReason failureReason, Throwable
cause) {
super(failureReason.message(), cause);
- this.checkpointFailureReason = checkNotNull(failureReason);
+ this.checkpointCloseReason = checkNotNull(failureReason);
}
public CheckpointException(
- String message, CheckpointFailureReason failureReason, Throwable
cause) {
+ String message, CheckpointCloseReason failureReason, Throwable cause) {
super(message + " Failure reason: " + failureReason.message(), cause);
- this.checkpointFailureReason = checkNotNull(failureReason);
+ this.checkpointCloseReason = checkNotNull(failureReason);
}
- public CheckpointFailureReason getCheckpointFailureReason() {
- return checkpointFailureReason;
+ public CheckpointCloseReason getCheckpointFailureReason() {
+ return checkpointCloseReason;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index ab42cf79b..7cc6d13b8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -137,9 +137,8 @@ public class CheckpointManager {
return getCheckpointCoordinator(pipelineId).startSavepoint();
}
- public void reportedPipelineRunning(int pipelineId) {
- getCheckpointCoordinator(pipelineId).setAllTaskReady(true);
- getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
+ public void reportedPipelineRunning(int pipelineId, boolean
alreadyStarted) {
+
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
}
protected void handleCheckpointError(int pipelineId, Throwable e) {
@@ -182,7 +181,7 @@ public class CheckpointManager {
* <br> Listen to the {@link PipelineStatus} of the {@link SubPlan}, which
is used to cancel the running {@link PendingCheckpoint} when the SubPlan is
abnormal.
*/
public CompletableFuture<Void> listenPipelineRetry(int pipelineId,
PipelineStatus pipelineStatus) {
-
getCheckpointCoordinator(pipelineId).cleanPendingCheckpoint(CheckpointFailureReason.PIPELINE_END);
+
getCheckpointCoordinator(pipelineId).cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
return CompletableFuture.completedFuture(null);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 3fff60ad9..23ad0aa59 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -157,9 +157,13 @@ public class PendingCheckpoint implements Checkpoint {
taskStatistics);
}
- public void abortCheckpoint(CheckpointFailureReason failureReason,
+ public void abortCheckpoint(CheckpointCloseReason closedReason,
@Nullable Throwable cause) {
- this.failureCause = new CheckpointException(failureReason, cause);
- completableFuture.completeExceptionally(failureCause);
+ if
(closedReason.equals(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET)) {
+ completableFuture.complete(null);
+ } else {
+ this.failureCause = new CheckpointException(closedReason, cause);
+ completableFuture.completeExceptionally(failureCause);
+ }
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 87143874f..49744bbd3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -415,6 +415,7 @@ public class PhysicalVertex {
}
}
}
+ this.taskFuture.complete(new TaskExecutionState(taskGroupLocation,
ExecutionState.CANCELED, null));
}
private void updateStateTimestamps(@NonNull ExecutionState targetState) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 5bf859158..b1fc23b3b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -338,6 +338,7 @@ public class SubPlan {
forcePipelineFinish();
return;
}
+
jobMaster.getCheckpointManager().reportedPipelineRunning(pipelineId, false);
reSchedulerPipelineFuture =
jobMaster.reSchedulerPipeline(this);
if (reSchedulerPipelineFuture != null) {
reSchedulerPipelineFuture.join();
@@ -371,7 +372,7 @@ public class SubPlan {
} else if (PipelineStatus.CANCELING.equals(getPipelineState())) {
cancelPipelineTasks();
} else if (PipelineStatus.RUNNING.equals(getPipelineState())) {
-
jobMaster.getCheckpointManager().reportedPipelineRunning(this.getPipelineLocation().getPipelineId());
+
jobMaster.getCheckpointManager().reportedPipelineRunning(this.getPipelineLocation().getPipelineId(),
true);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index a7ba48ce6..755d6e4b8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -28,8 +28,8 @@ import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointFailureReason;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -123,7 +123,7 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
.untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING,
jobMaster.getJobStatus()));
// call checkpoint timeout
- jobMaster.handleCheckpointError(1, new
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+ jobMaster.handleCheckpointError(1, new
CheckpointException(CheckpointCloseReason.CHECKPOINT_EXPIRED));
// Because handleCheckpointTimeout is an async method, so we need
sleep 5s to waiting job status become running again
Thread.sleep(5000);