This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7ed372897 [HotFix][Zeta] fix after the savepoint job is restored, the
checkpoint file cannot be generated #4985 (#5051)
7ed372897 is described below
commit 7ed372897930cb4d5ad40907d7099358b5839b59
Author: wu-a-ge <[email protected]>
AuthorDate: Tue Jul 11 22:24:53 2023 +0800
[HotFix][Zeta] fix after the savepoint job is restored, the checkpoint file
cannot be generated #4985 (#5051)
* fix after the savepoint job is restored, the checkpoint file cannot be
generated
---
.../engine/server/checkpoint/CheckpointCoordinator.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 3ae265850..9e0ef2a53 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -183,8 +183,19 @@ public class CheckpointCoordinator {
this.checkpointIdCounter = checkpointIdCounter;
this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
if (pipelineState != null) {
- this.latestCompletedCheckpoint =
+ // fix after the savepoint job is restored, the checkpoint file
cannot be generated
+ CompletedCheckpoint tmpCheckpoint =
serializer.deserialize(pipelineState.getStates(),
CompletedCheckpoint.class);
+ this.latestCompletedCheckpoint =
+ new CompletedCheckpoint(
+ tmpCheckpoint.getJobId(),
+ tmpCheckpoint.getPipelineId(),
+ tmpCheckpoint.getCheckpointId(),
+ tmpCheckpoint.getCheckpointTimestamp(),
+ CheckpointType.CHECKPOINT_TYPE,
+ tmpCheckpoint.getCompletedTimestamp(),
+ tmpCheckpoint.getTaskStates(),
+ tmpCheckpoint.getTaskStatistics());
}
this.checkpointCoordinatorFuture = new CompletableFuture();