This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f87199cf8 fix job error not right bug (#4355)
f87199cf8 is described below

commit f87199cf81a2e5d2c0a08fa113a866a64cf40e73
Author: Eric <[email protected]>
AuthorDate: Thu Mar 16 13:42:54 2023 +0800

    fix job error not right bug (#4355)
---
 .../apache/seatunnel/engine/server/dag/physical/SubPlan.java | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index b72a789e8..1a8700344 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -196,8 +196,8 @@ public class SubPlan {
                                         checkpointCoordinatorState
                                                 
.getCheckpointCoordinatorStatus())) {
                                     pipelineStatus = PipelineStatus.FAILED;
-                                    errorByPhysicalVertex.set(
-                                            
checkpointCoordinatorState.getThrowableMsg());
+                                    errorByPhysicalVertex.compareAndSet(
+                                            null, 
checkpointCoordinatorState.getThrowableMsg());
                                 }
                             } else {
                                 pipelineStatus = PipelineStatus.FINISHED;
@@ -211,14 +211,14 @@ public class SubPlan {
                                         checkpointCoordinatorState
                                                 
.getCheckpointCoordinatorStatus())) {
                                     pipelineStatus = PipelineStatus.FAILED;
-                                    errorByPhysicalVertex.set(
-                                            
checkpointCoordinatorState.getThrowableMsg());
+                                    errorByPhysicalVertex.compareAndSet(
+                                            null, 
checkpointCoordinatorState.getThrowableMsg());
                                 } else if 
(CheckpointCoordinatorStatus.CANCELED.equals(
                                         checkpointCoordinatorState
                                                 
.getCheckpointCoordinatorStatus())) {
                                     pipelineStatus = PipelineStatus.CANCELED;
-                                    errorByPhysicalVertex.set(
-                                            
checkpointCoordinatorState.getThrowableMsg());
+                                    errorByPhysicalVertex.compareAndSet(
+                                            null, 
checkpointCoordinatorState.getThrowableMsg());
                                 }
                             }
 

Reply via email to