This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0633f6fb82 [Improve][Zeta] Optimize unstable 
CheckpointErrorRestoreEndTest cases (#9619)
0633f6fb82 is described below

commit 0633f6fb825720abe0817f65099f55ae4a849036
Author: Jia Fan <[email protected]>
AuthorDate: Mon Jul 28 15:30:04 2025 +0800

    [Improve][Zeta] Optimize unstable CheckpointErrorRestoreEndTest cases 
(#9619)
---
 .../org/apache/seatunnel/engine/server/dag/physical/SubPlan.java  | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index abace69bfe..3d683b3265 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -98,7 +98,7 @@ public class SubPlan {
 
     private PassiveCompletableFuture<Void> reSchedulerPipelineFuture;
 
-    private Integer pipelineRestoreNum;
+    private AtomicInteger pipelineRestoreNum;
 
     private final Object restoreLock = new Object();
 
@@ -125,7 +125,7 @@ public class SubPlan {
         this.pipelineFuture = new CompletableFuture<>();
         this.physicalVertexList = physicalVertexList;
         this.coordinatorVertexList = coordinatorVertexList;
-        pipelineRestoreNum = 0;
+        pipelineRestoreNum = new AtomicInteger();
         pipelineMaxRestoreNum =
                 Integer.parseInt(
                         jobImmutableInformation
@@ -462,7 +462,7 @@ public class SubPlan {
     private boolean prepareRestorePipeline() {
         synchronized (restoreLock) {
             try {
-                pipelineRestoreNum++;
+                pipelineRestoreNum.getAndIncrement();
                 log.info(
                         String.format(
                                 "Restore time %s, pipeline %s",
@@ -590,7 +590,7 @@ public class SubPlan {
     }
 
     public int getPipelineRestoreNum() {
-        return pipelineRestoreNum;
+        return pipelineRestoreNum.get();
     }
 
     public void handleCheckpointError() {

Reply via email to