This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7ed372897 [HotFix][Zeta] fix after the savepoint job is restored, the 
checkpoint file cannot be generated #4985 (#5051)
7ed372897 is described below

commit 7ed372897930cb4d5ad40907d7099358b5839b59
Author: wu-a-ge <[email protected]>
AuthorDate: Tue Jul 11 22:24:53 2023 +0800

    [HotFix][Zeta] fix after the savepoint job is restored, the checkpoint file 
cannot be generated #4985 (#5051)
    
    * fix after the savepoint job is restored, the checkpoint file cannot be 
generated
---
 .../engine/server/checkpoint/CheckpointCoordinator.java     | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 3ae265850..9e0ef2a53 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -183,8 +183,19 @@ public class CheckpointCoordinator {
         this.checkpointIdCounter = checkpointIdCounter;
         this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
         if (pipelineState != null) {
-            this.latestCompletedCheckpoint =
+            // fix after the savepoint job is restored, the checkpoint file 
cannot be generated
+            CompletedCheckpoint tmpCheckpoint =
                     serializer.deserialize(pipelineState.getStates(), 
CompletedCheckpoint.class);
+            this.latestCompletedCheckpoint =
+                    new CompletedCheckpoint(
+                            tmpCheckpoint.getJobId(),
+                            tmpCheckpoint.getPipelineId(),
+                            tmpCheckpoint.getCheckpointId(),
+                            tmpCheckpoint.getCheckpointTimestamp(),
+                            CheckpointType.CHECKPOINT_TYPE,
+                            tmpCheckpoint.getCompletedTimestamp(),
+                            tmpCheckpoint.getTaskStates(),
+                            tmpCheckpoint.getTaskStatistics());
         }
         this.checkpointCoordinatorFuture = new CompletableFuture();
 

Reply via email to