This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bc28eb1747 [Improve][Zeta] Move checkpoint notify complete in
checkpoint stage (#5185)
bc28eb1747 is described below
commit bc28eb1747d880babe7e04b8b399caf20da8f74b
Author: Jia Fan <[email protected]>
AuthorDate: Mon Jul 31 14:50:38 2023 +0800
[Improve][Zeta] Move checkpoint notify complete in checkpoint stage (#5185)
---
.../engine/server/checkpoint/CheckpointCoordinator.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 0f9e03df34..e6b3c3d27a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -695,14 +695,6 @@ public class CheckpointCoordinator {
completedCheckpoint.getCheckpointTimestamp(),
completedCheckpoint.getCompletedTimestamp());
final long checkpointId = completedCheckpoint.getCheckpointId();
- pendingCheckpoints.remove(checkpointId);
- pendingCounter.decrementAndGet();
- if (pendingCheckpoints.size() + 1 ==
coordinatorConfig.getMaxConcurrentCheckpoints()) {
- // latest checkpoint completed time > checkpoint interval
- if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
- scheduleTriggerPendingCheckpoint(0L);
- }
- }
completedCheckpoints.addLast(completedCheckpoint);
try {
byte[] states = serializer.serialize(completedCheckpoint);
@@ -742,6 +734,14 @@ public class CheckpointCoordinator {
completedCheckpoint.getJobId());
latestCompletedCheckpoint = completedCheckpoint;
notifyCompleted(completedCheckpoint);
+ pendingCheckpoints.remove(checkpointId);
+ pendingCounter.decrementAndGet();
+ if (pendingCheckpoints.size() + 1 ==
coordinatorConfig.getMaxConcurrentCheckpoints()) {
+ // latest checkpoint completed time > checkpoint interval
+ if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
+ scheduleTriggerPendingCheckpoint(0L);
+ }
+ }
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
if
(latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) {