This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f87199cf8 fix job error not right bug (#4355)
f87199cf8 is described below
commit f87199cf81a2e5d2c0a08fa113a866a64cf40e73
Author: Eric <[email protected]>
AuthorDate: Thu Mar 16 13:42:54 2023 +0800
fix job error not right bug (#4355)
---
.../apache/seatunnel/engine/server/dag/physical/SubPlan.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index b72a789e8..1a8700344 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -196,8 +196,8 @@ public class SubPlan {
checkpointCoordinatorState
.getCheckpointCoordinatorStatus())) {
pipelineStatus = PipelineStatus.FAILED;
- errorByPhysicalVertex.set(
-
checkpointCoordinatorState.getThrowableMsg());
+ errorByPhysicalVertex.compareAndSet(
+ null,
checkpointCoordinatorState.getThrowableMsg());
}
} else {
pipelineStatus = PipelineStatus.FINISHED;
@@ -211,14 +211,14 @@ public class SubPlan {
checkpointCoordinatorState
.getCheckpointCoordinatorStatus())) {
pipelineStatus = PipelineStatus.FAILED;
- errorByPhysicalVertex.set(
-
checkpointCoordinatorState.getThrowableMsg());
+ errorByPhysicalVertex.compareAndSet(
+ null,
checkpointCoordinatorState.getThrowableMsg());
} else if
(CheckpointCoordinatorStatus.CANCELED.equals(
checkpointCoordinatorState
.getCheckpointCoordinatorStatus())) {
pipelineStatus = PipelineStatus.CANCELED;
- errorByPhysicalVertex.set(
-
checkpointCoordinatorState.getThrowableMsg());
+ errorByPhysicalVertex.compareAndSet(
+ null,
checkpointCoordinatorState.getThrowableMsg());
}
}