This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e05ceb798 [Hotfix]'']Fix job error message is not right bug (#4463)
e05ceb798 is described below
commit e05ceb798d0860acbd1aba1f1bbd0937bfccb2c3
Author: Eric <[email protected]>
AuthorDate: Fri Mar 31 14:51:33 2023 +0800
[Hotfix]'']Fix job error message is not right bug (#4463)
---
config/log4j2.properties | 3 +++
.../engine/server/checkpoint/CheckpointCoordinator.java | 11 ++++++++---
.../apache/seatunnel/engine/server/dag/physical/SubPlan.java | 4 +++-
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/config/log4j2.properties b/config/log4j2.properties
index 8b8d47db8..fb1a07c6d 100644
--- a/config/log4j2.properties
+++ b/config/log4j2.properties
@@ -27,6 +27,9 @@ rootLogger.level = INFO
logger.zeta.name=org.apache.seatunnel.engine
logger.zeta.level=INFO
+logger.debezium.name=io.debezium.connector
+logger.debezium.level=WARN
+
############################ log output to console
#############################
#rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
#rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 42ff998b9..ea3cfd689 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
@@ -127,6 +128,8 @@ public class CheckpointCoordinator {
private CompletableFuture<CheckpointCoordinatorState>
checkpointCoordinatorFuture;
+ private AtomicReference<String> errorByPhysicalVertex = new
AtomicReference<>();
+
@SneakyThrows
public CheckpointCoordinator(
CheckpointManager manager,
@@ -212,15 +215,16 @@ public class CheckpointCoordinator {
}
private void handleCoordinatorError(CheckpointCloseReason reason,
Throwable e) {
+ CheckpointException checkpointException = new
CheckpointException(reason, e);
+ errorByPhysicalVertex.compareAndSet(null,
ExceptionUtils.getMessage(checkpointException));
+
if (checkpointCoordinatorFuture.isDone()) {
return;
}
- CheckpointException checkpointException = new
CheckpointException(reason, e);
cleanPendingCheckpoint(reason);
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
- CheckpointCoordinatorStatus.FAILED,
- ExceptionUtils.getMessage(checkpointException)));
+ CheckpointCoordinatorStatus.FAILED,
errorByPhysicalVertex.get()));
checkpointManager.handleCheckpointError(pipelineId);
}
@@ -295,6 +299,7 @@ public class CheckpointCoordinator {
protected void restoreCoordinator(boolean alreadyStarted) {
LOG.info("received restore CheckpointCoordinator with alreadyStarted=
" + alreadyStarted);
+ errorByPhysicalVertex = new AtomicReference<>();
checkpointCoordinatorFuture = new CompletableFuture<>();
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
shutdown = false;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 1a8700344..d11ad63ee 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -82,7 +82,7 @@ public class SubPlan {
private final PipelineLocation pipelineLocation;
/** The error throw by physicalVertex, should be set when physicalVertex
throw error. */
- private final AtomicReference<String> errorByPhysicalVertex = new
AtomicReference<>();
+ private AtomicReference<String> errorByPhysicalVertex = new
AtomicReference<>();
private final ExecutorService executorService;
@@ -140,6 +140,8 @@ public class SubPlan {
}
public synchronized PassiveCompletableFuture<PipelineExecutionState>
initStateFuture() {
+ // reset errorByPhysicalVertex when restore pipeline
+ errorByPhysicalVertex = new AtomicReference<>();
physicalVertexList.forEach(
physicalVertex -> {
addPhysicalVertexCallBack(physicalVertex.initStateFuture());