This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d2b1bf419 [Hotfix][Zeta] The pipeline needs to wait for the
CheckpointCoordinator to end (#4272)
d2b1bf419 is described below
commit d2b1bf4193d1a54ad8a65c48a1c161b2acf08942
Author: Eric <[email protected]>
AuthorDate: Mon Mar 6 14:23:18 2023 +0800
[Hotfix][Zeta] The pipeline needs to wait for the CheckpointCoordinator to
end (#4272)
---
.../server/checkpoint/CheckpointCoordinator.java | 33 ++++++++++++-
.../checkpoint/CheckpointCoordinatorState.java | 34 +++++++++++++
.../checkpoint/CheckpointCoordinatorStatus.java | 26 ++++++++++
.../server/checkpoint/CheckpointManager.java | 39 +++++++++------
.../operation/NotifyTaskStartOperation.java | 4 +-
.../engine/server/dag/physical/PhysicalPlan.java | 15 ------
.../engine/server/dag/physical/SubPlan.java | 55 ++++++++++++++++------
.../seatunnel/engine/server/master/JobMaster.java | 4 +-
.../engine/server/master/JobMasterTest.java | 5 +-
9 files changed, 162 insertions(+), 53 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index f7e8626c4..260f09582 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -126,6 +126,8 @@ public class CheckpointCoordinator {
private final ExecutorService executorService;
+ private CompletableFuture<CheckpointCoordinatorState>
checkpointCoordinatorFuture;
+
@SneakyThrows
public CheckpointCoordinator(
CheckpointManager manager,
@@ -168,6 +170,7 @@ public class CheckpointCoordinator {
this.latestCompletedCheckpoint =
serializer.deserialize(pipelineState.getStates(),
CompletedCheckpoint.class);
}
+ this.checkpointCoordinatorFuture = new CompletableFuture();
}
public int getPipelineId() {
@@ -210,8 +213,13 @@ public class CheckpointCoordinator {
}
private void handleCoordinatorError(CheckpointCloseReason reason,
Throwable e) {
+ CheckpointException checkpointException = new
CheckpointException(reason, e);
cleanPendingCheckpoint(reason);
- checkpointManager.handleCheckpointError(pipelineId, new
CheckpointException(reason, e));
+ checkpointCoordinatorFuture.complete(
+ new CheckpointCoordinatorState(
+ CheckpointCoordinatorStatus.FAILED,
+ ExceptionUtils.getMessage(checkpointException)));
+ checkpointManager.handleCheckpointError(pipelineId);
}
private void restoreTaskState(TaskLocation taskLocation) {
@@ -282,6 +290,7 @@ public class CheckpointCoordinator {
protected void restoreCoordinator(boolean alreadyStarted) {
LOG.info("received restore CheckpointCoordinator with alreadyStarted=
" + alreadyStarted);
+ checkpointCoordinatorFuture = new CompletableFuture<>();
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
shutdown = false;
if (alreadyStarted) {
@@ -653,6 +662,8 @@ public class CheckpointCoordinator {
latestCompletedCheckpoint = completedCheckpoint;
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
+ checkpointCoordinatorFuture.complete(
+ new
CheckpointCoordinatorState(CheckpointCoordinatorStatus.FINISHED, null));
}
}
@@ -669,7 +680,8 @@ public class CheckpointCoordinator {
if (latestCompletedCheckpoint == null) {
return false;
}
- return latestCompletedCheckpoint.getCheckpointType() ==
COMPLETED_POINT_TYPE;
+ return latestCompletedCheckpoint.getCheckpointType() ==
COMPLETED_POINT_TYPE
+ || latestCompletedCheckpoint.getCheckpointType() ==
SAVEPOINT_TYPE;
}
public boolean isEndOfSavePoint() {
@@ -678,4 +690,21 @@ public class CheckpointCoordinator {
}
return latestCompletedCheckpoint.getCheckpointType() == SAVEPOINT_TYPE;
}
+
+ public PassiveCompletableFuture<CheckpointCoordinatorState>
+ waitCheckpointCoordinatorComplete() {
+ return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
+ }
+
+ public PassiveCompletableFuture<CheckpointCoordinatorState>
cancelCheckpoint() {
+ // checkpoint maybe already failed before all tasks complete.
+ if (checkpointCoordinatorFuture.isDone()) {
+ return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
+ }
+ cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
+ CheckpointCoordinatorState checkpointCoordinatorState =
+ new
CheckpointCoordinatorState(CheckpointCoordinatorStatus.CANCELED, null);
+ checkpointCoordinatorFuture.complete(checkpointCoordinatorState);
+ return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorState.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorState.java
new file mode 100644
index 000000000..f417896df
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import lombok.Getter;
+
+@Getter
+public class CheckpointCoordinatorState {
+
+ private final CheckpointCoordinatorStatus checkpointCoordinatorStatus;
+
+ private final String throwableMsg;
+
+ public CheckpointCoordinatorState(
+ CheckpointCoordinatorStatus checkpointCoordinatorStatus, String
throwableMsg) {
+ this.checkpointCoordinatorStatus = checkpointCoordinatorStatus;
+ this.throwableMsg = throwableMsg;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java
new file mode 100644
index 000000000..86abfea9c
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+public enum CheckpointCoordinatorStatus {
+ FINISHED,
+
+ CANCELED,
+
+ FAILED;
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 3c21e8e4f..3e18a458e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -171,8 +171,8 @@ public class CheckpointManager {
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
}
- protected void handleCheckpointError(int pipelineId, Throwable e) {
- jobMaster.handleCheckpointError(pipelineId, e);
+ protected void handleCheckpointError(int pipelineId) {
+ jobMaster.handleCheckpointError(pipelineId);
}
private CheckpointCoordinator getCheckpointCoordinator(TaskLocation
taskLocation) {
@@ -211,18 +211,6 @@ public class CheckpointManager {
getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
}
- /**
- * Called by the JobMaster. <br>
- * Listen to the {@link PipelineStatus} of the {@link SubPlan}, which is
used to cancel the
- * running {@link PendingCheckpoint} when the SubPlan is abnormal.
- */
- public CompletableFuture<Void> listenPipelineRetry(
- int pipelineId, PipelineStatus pipelineStatus) {
- getCheckpointCoordinator(pipelineId)
- .cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
- return CompletableFuture.completedFuture(null);
- }
-
/**
* Called by the JobMaster. <br>
* Listen to the {@link PipelineStatus} of the {@link Pipeline}, which is
used to shut down the
@@ -287,4 +275,27 @@ public class CheckpointManager {
jobMaster.queryTaskGroupAddress(
operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
}
+
+ /**
+ * Call By JobMaster If all the tasks canceled or some task failed,
JobMaster will call this
+ * method to cancel checkpoint coordinator.
+ *
+ * @param pipelineId
+ * @return
+ */
+ public PassiveCompletableFuture<CheckpointCoordinatorState>
cancelCheckpoint(int pipelineId) {
+ return getCheckpointCoordinator(pipelineId).cancelCheckpoint();
+ }
+
+ /**
+ * Call By JobMaster If all the tasks is finished, JobMaster will call
this method to wait
+ * checkpoint coordinator complete.
+ *
+ * @param pipelineId
+ * @return
+ */
+ public PassiveCompletableFuture<CheckpointCoordinatorState>
waitCheckpointCoordinatorComplete(
+ int pipelineId) {
+ return
getCheckpointCoordinator(pipelineId).waitCheckpointCoordinatorComplete();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
index b75d4813f..62224aea4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
@@ -59,7 +59,9 @@ public class NotifyTaskStartOperation extends TaskOperation {
true,
exception ->
exception instanceof
TaskGroupContextNotFoundException
- &&
!server.taskIsEnded(taskLocation.getTaskGroupLocation()),
+ || exception instanceof
NullPointerException
+ && !server.taskIsEnded(
+
taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index a0b4fe066..f8d59188b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -139,21 +139,6 @@ public class PhysicalPlan {
future.thenAcceptAsync(
pipelineState -> {
try {
- // Notify checkpoint manager when the pipeline end,
Whether the pipeline
- // will be restarted or not
- LOGGER.info(
- String.format(
- "received pipeline %s callback, state
%s",
- subPlan.getPipelineFullName(),
- pipelineState.getPipelineStatus()));
- if (jobMaster.getCheckpointManager() != null) {
- jobMaster
- .getCheckpointManager()
- .listenPipelineRetry(
-
subPlan.getPipelineLocation().getPipelineId(),
- pipelineState.getPipelineStatus())
- .join();
- }
if
(PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
if (subPlan.canRestorePipeline()) {
subPlan.restorePipeline();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 9e8d6adb8..cc68d8629 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -22,6 +22,8 @@ import
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorState;
+import
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorStatus;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -92,8 +94,6 @@ public class SubPlan {
private final Object restoreLock = new Object();
- private Throwable checkpointThrowable;
-
public SubPlan(
int pipelineId,
int totalPipelineNum,
@@ -179,17 +179,47 @@ public class SubPlan {
PipelineStatus pipelineStatus = null;
if (failedTaskNum.get() > 0) {
pipelineStatus = PipelineStatus.FAILED;
+ // we don't care the checkpoint error reason
when the task is
+ // failed.
+ jobMaster
+ .getCheckpointManager()
+ .cancelCheckpoint(getPipelineId())
+ .join();
} else if (canceledTaskNum.get() > 0) {
- if (checkpointThrowable != null) {
+ pipelineStatus = PipelineStatus.CANCELED;
+ CheckpointCoordinatorState
checkpointCoordinatorState =
+ jobMaster
+ .getCheckpointManager()
+
.cancelCheckpoint(getPipelineId())
+ .join();
+ if (CheckpointCoordinatorStatus.FAILED.equals(
+ checkpointCoordinatorState
+
.getCheckpointCoordinatorStatus())) {
pipelineStatus = PipelineStatus.FAILED;
errorByPhysicalVertex.set(
-
ExceptionUtils.getMessage(checkpointThrowable));
- checkpointThrowable = null;
- } else {
- pipelineStatus = PipelineStatus.CANCELED;
+
checkpointCoordinatorState.getThrowableMsg());
}
} else {
pipelineStatus = PipelineStatus.FINISHED;
+ CheckpointCoordinatorState
checkpointCoordinatorState =
+ jobMaster
+ .getCheckpointManager()
+
.waitCheckpointCoordinatorComplete(getPipelineId())
+ .join();
+
+ if (CheckpointCoordinatorStatus.FAILED.equals(
+ checkpointCoordinatorState
+
.getCheckpointCoordinatorStatus())) {
+ pipelineStatus = PipelineStatus.FAILED;
+ errorByPhysicalVertex.set(
+
checkpointCoordinatorState.getThrowableMsg());
+ } else if
(CheckpointCoordinatorStatus.CANCELED.equals(
+ checkpointCoordinatorState
+
.getCheckpointCoordinatorStatus())) {
+ pipelineStatus = PipelineStatus.CANCELED;
+ errorByPhysicalVertex.set(
+
checkpointCoordinatorState.getThrowableMsg());
+ }
}
if (!checkNeedRestore(pipelineStatus)) {
@@ -339,10 +369,7 @@ public class SubPlan {
private void cancelCheckpointCoordinator() {
if (jobMaster.getCheckpointManager() != null) {
- jobMaster
- .getCheckpointManager()
- .listenPipelineRetry(pipelineId, PipelineStatus.CANCELING)
- .join();
+
jobMaster.getCheckpointManager().cancelCheckpoint(pipelineId).join();
}
}
@@ -524,12 +551,10 @@ public class SubPlan {
return pipelineRestoreNum;
}
- public void handleCheckpointError(Throwable e) {
+ public void handleCheckpointError() {
LOGGER.warning(
String.format(
- "%s checkpoint have error, cancel the pipeline",
getPipelineFullName()),
- e);
- this.checkpointThrowable = e;
+ "%s checkpoint have error, cancel the pipeline",
getPipelineFullName()));
this.cancelPipeline();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 302b2bca6..0b840e22c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -319,13 +319,13 @@ public class JobMaster {
}
}
- public void handleCheckpointError(long pipelineId, Throwable e) {
+ public void handleCheckpointError(long pipelineId) {
this.physicalPlan
.getPipelineList()
.forEach(
pipeline -> {
if (pipeline.getPipelineLocation().getPipelineId()
== pipelineId) {
- pipeline.handleCheckpointError(e);
+ pipeline.handleCheckpointError();
}
});
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 655c8e8a6..7e25f0c1c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -27,8 +27,6 @@ import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -136,8 +134,7 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
() -> Assertions.assertEquals(JobStatus.RUNNING,
jobMaster.getJobStatus()));
// call checkpoint timeout
- jobMaster.handleCheckpointError(
- 1, new
CheckpointException(CheckpointCloseReason.CHECKPOINT_EXPIRED));
+ jobMaster.handleCheckpointError(1);
// Because handleCheckpointTimeout is an async method, so we need
sleep 5s to waiting job
// status become running again