This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0633f6fb82 [Improve][Zeta] Optimize unstable
CheckpointErrorRestoreEndTest cases (#9619)
0633f6fb82 is described below
commit 0633f6fb825720abe0817f65099f55ae4a849036
Author: Jia Fan <[email protected]>
AuthorDate: Mon Jul 28 15:30:04 2025 +0800
[Improve][Zeta] Optimize unstable CheckpointErrorRestoreEndTest cases
(#9619)
---
.../org/apache/seatunnel/engine/server/dag/physical/SubPlan.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index abace69bfe..3d683b3265 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -98,7 +98,7 @@ public class SubPlan {
private PassiveCompletableFuture<Void> reSchedulerPipelineFuture;
- private Integer pipelineRestoreNum;
+ private AtomicInteger pipelineRestoreNum;
private final Object restoreLock = new Object();
@@ -125,7 +125,7 @@ public class SubPlan {
this.pipelineFuture = new CompletableFuture<>();
this.physicalVertexList = physicalVertexList;
this.coordinatorVertexList = coordinatorVertexList;
- pipelineRestoreNum = 0;
+ pipelineRestoreNum = new AtomicInteger();
pipelineMaxRestoreNum =
Integer.parseInt(
jobImmutableInformation
@@ -462,7 +462,7 @@ public class SubPlan {
private boolean prepareRestorePipeline() {
synchronized (restoreLock) {
try {
- pipelineRestoreNum++;
+ pipelineRestoreNum.getAndIncrement();
log.info(
String.format(
"Restore time %s, pipeline %s",
@@ -590,7 +590,7 @@ public class SubPlan {
}
public int getPipelineRestoreNum() {
- return pipelineRestoreNum;
+ return pipelineRestoreNum.get();
}
public void handleCheckpointError() {