This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7519817630 [Improve][Zeta] Improve CheckpointCoordinator notify
complete when restore (#5136)
7519817630 is described below
commit 75198176306fd7184d308da01a0ee33a3ef70c64
Author: Jia Fan <[email protected]>
AuthorDate: Mon Jul 24 10:52:22 2023 +0800
[Improve][Zeta] Improve CheckpointCoordinator notify complete when restore
(#5136)
* [Improve] Improve CheckpointCoordinator notify complete when restore
* update
---
.../server/checkpoint/CheckpointCloseReason.java | 3 ++-
.../server/checkpoint/CheckpointCoordinator.java | 25 ++++++++++++++++++----
.../server/checkpoint/CheckpointManager.java | 4 ++--
.../seatunnel/engine/server/master/JobMaster.java | 5 ++++-
.../engine/server/master/JobMasterTest.java | 2 +-
5 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index ae1af4d41f..9f35f62fd6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -26,7 +26,8 @@ public enum CheckpointCloseReason {
CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error."),
AGGREGATE_COMMIT_ERROR("Aggregate commit error."),
- TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error");
+ TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error"),
+ CHECKPOINT_NOTIFY_COMPLETE_FAILED("Checkpoint notify complete failed");
private final String message;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 9e0ef2a53a..9bf0e77069 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -272,7 +272,8 @@ public class CheckpointCoordinator {
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
CheckpointCoordinatorStatus.FAILED,
errorByPhysicalVertex.get()));
- checkpointManager.handleCheckpointError(pipelineId);
+ checkpointManager.handleCheckpointError(
+ pipelineId,
reason.equals(CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED));
}
private void restoreTaskState(TaskLocation taskLocation) {
@@ -316,9 +317,26 @@ public class CheckpointCoordinator {
isAllTaskReady = true;
InvocationFuture<?>[] futures = notifyTaskStart();
CompletableFuture.allOf(futures).join();
+ notifyCompleted(latestCompletedCheckpoint);
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
}
+ private void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
+ if (completedCheckpoint != null) {
+ try {
+ LOG.info("start notify checkpoint completed, checkpoint:{}",
completedCheckpoint);
+ InvocationFuture<?>[] invocationFutures =
+
notifyCheckpointCompleted(completedCheckpoint.getCheckpointId());
+ CompletableFuture.allOf(invocationFutures).join();
+ } catch (Throwable e) {
+ handleCoordinatorError(
+ "notify checkpoint completed failed",
+ e,
+
CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED);
+ }
+ }
+ }
+
public InvocationFuture<?>[] notifyTaskStart() {
return plan.getPipelineSubtasks().stream()
.map(NotifyTaskStartOperation::new)
@@ -358,6 +376,7 @@ public class CheckpointCoordinator {
shutdown = false;
if (alreadyStarted) {
isAllTaskReady = true;
+ notifyCompleted(latestCompletedCheckpoint);
tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
} else {
isAllTaskReady = false;
@@ -719,10 +738,8 @@ public class CheckpointCoordinator {
completedCheckpoint.getCheckpointId(),
completedCheckpoint.getPipelineId(),
completedCheckpoint.getJobId());
- InvocationFuture<?>[] invocationFutures =
notifyCheckpointCompleted(checkpointId);
- CompletableFuture.allOf(invocationFutures).join();
- // TODO: notifyCheckpointCompleted fail
latestCompletedCheckpoint = completedCheckpoint;
+ notifyCompleted(completedCheckpoint);
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
if
(latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index f34ae2f6a0..0c5a91698e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -174,8 +174,8 @@ public class CheckpointManager {
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
}
- protected void handleCheckpointError(int pipelineId) {
- jobMaster.handleCheckpointError(pipelineId);
+ protected void handleCheckpointError(int pipelineId, boolean neverRestore)
{
+ jobMaster.handleCheckpointError(pipelineId, neverRestore);
}
private CheckpointCoordinator getCheckpointCoordinator(TaskLocation
taskLocation) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 13b89a69dd..11cc5f21b0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -325,7 +325,10 @@ public class JobMaster {
}
}
- public void handleCheckpointError(long pipelineId) {
+ public void handleCheckpointError(long pipelineId, boolean neverRestore) {
+ if (neverRestore) {
+ this.neverNeedRestore();
+ }
this.physicalPlan
.getPipelineList()
.forEach(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index efdc9e0f71..bb331bd018 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -139,7 +139,7 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
jobMaster.neverNeedRestore();
// call checkpoint timeout
- jobMaster.handleCheckpointError(1);
+ jobMaster.handleCheckpointError(1, false);
PassiveCompletableFuture<JobResult> jobMasterCompleteFuture =
jobMaster.getJobMasterCompleteFuture();