zhuzhurk commented on code in PR #24771:
URL: https://github.com/apache/flink/pull/24771#discussion_r1609398720
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler
createSpeculativeExecutionHandler(
protected void startSchedulingInternal() {
speculativeExecutionHandler.init(
getExecutionGraph(), getMainThreadExecutor(),
jobManagerJobMetricGroup);
+ jobRecoveryHandler.initialize(
+ log,
+ getExecutionGraph(),
+ shuffleMaster,
+ getMainThreadExecutor(),
+ failoverStrategy,
+ this::failJob,
+ this::resetVerticesInRecovering,
+ this::updateResultPartitionBytesMetrics,
+ this::initializeJobVertex,
+ this::updateTopology);
+
+ if (jobRecoveryHandler.needRecover()) {
+ getMainThreadExecutor()
+ .schedule(
+ () ->
+ jobRecoveryHandler.startRecovering(
+ this::onRecoveringFinished,
this::onRecoveringFailed),
+ previousWorkerRecoveryTimeout.toMillis(),
Review Comment:
Is it possible to start the recovering immediately when all recorded
finished partitions are recovered?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler
createSpeculativeExecutionHandler(
protected void startSchedulingInternal() {
speculativeExecutionHandler.init(
getExecutionGraph(), getMainThreadExecutor(),
jobManagerJobMetricGroup);
+ jobRecoveryHandler.initialize(
+ log,
+ getExecutionGraph(),
+ shuffleMaster,
+ getMainThreadExecutor(),
+ failoverStrategy,
+ this::failJob,
+ this::resetVerticesInRecovering,
+ this::updateResultPartitionBytesMetrics,
+ this::initializeJobVertex,
+ this::updateTopology);
+
+ if (jobRecoveryHandler.needRecover()) {
+ getMainThreadExecutor()
+ .schedule(
+ () ->
+ jobRecoveryHandler.startRecovering(
+ this::onRecoveringFinished,
this::onRecoveringFailed),
+ previousWorkerRecoveryTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+ } else {
+ tryComputeSourceParallelismThenRunAsync(
+ (Void value, Throwable throwable) -> {
+ if (getExecutionGraph().getState() ==
JobStatus.CREATED) {
+ initializeVerticesIfPossible();
+ super.startSchedulingInternal();
+ }
+ });
+ }
+ }
+
+ @Override
+ protected void maybeRestartTasks(final FailureHandlingResult
failureHandlingResult) {
+ FailureHandlingResult wrappedResult = failureHandlingResult;
+ if (failureHandlingResult.canRestart()) {
+ Set<ExecutionVertexID> originalNeedToRestartVertices =
+ failureHandlingResult.getVerticesToRestart();
+
+ Set<JobVertexID> extraNeedToRestartJobVertices =
+ originalNeedToRestartVertices.stream()
+ .map(ExecutionVertexID::getJobVertexId)
+ .filter(requiredRestartJobVertices::contains)
+ .collect(Collectors.toSet());
+
+
requiredRestartJobVertices.removeAll(extraNeedToRestartJobVertices);
+
+ Set<ExecutionVertexID> needToRestartVertices =
+ extraNeedToRestartJobVertices.stream()
+ .flatMap(
+ jobVertexId -> {
+ ExecutionJobVertex jobVertex =
+
getExecutionJobVertex(jobVertexId);
+ return
Arrays.stream(jobVertex.getTaskVertices())
+ .map(ExecutionVertex::getID);
+ })
+ .collect(Collectors.toSet());
+ needToRestartVertices.addAll(originalNeedToRestartVertices);
+
+ wrappedResult =
+ FailureHandlingResult.restartable(
+
failureHandlingResult.getFailedExecution().orElse(null),
+ failureHandlingResult.getError(),
+ failureHandlingResult.getTimestamp(),
+ failureHandlingResult.getFailureLabels(),
+ needToRestartVertices,
+ failureHandlingResult.getRestartDelayMS(),
+ failureHandlingResult.isGlobalFailure(),
+ failureHandlingResult.isRootCause());
+ }
+
+ super.maybeRestartTasks(wrappedResult);
+ }
+
+ @VisibleForTesting
+ boolean isRecovering() {
+ return jobRecoveryHandler.isRecovering();
+ }
+
+ @Override
+ public boolean updateTaskExecutionState(final TaskExecutionStateTransition
taskExecutionState) {
+ boolean success = super.updateTaskExecutionState(taskExecutionState);
+
+ if (success
+ && taskExecutionState.getExecutionState() ==
ExecutionState.FINISHED
+ && !isRecovering()) {
+ final ExecutionVertexID executionVertexId =
+ taskExecutionState.getID().getExecutionVertexId();
+ jobRecoveryHandler.notifyExecutionFinished(executionVertexId,
taskExecutionState);
+ }
+ return success;
+ }
+
+ @Override
+ protected void resetForNewExecutions(Collection<ExecutionVertexID>
vertices) {
+ super.resetForNewExecutions(vertices);
+ if (!isRecovering()) {
+ jobRecoveryHandler.notifyExecutionVertexReset(vertices);
+ }
+ }
+
+ private void initializeJobVertex(
+ ExecutionJobVertex jobVertex,
+ int parallelism,
+ Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos,
+ long createTimestamp)
+ throws JobException {
+ if (!jobVertex.isParallelismDecided()) {
+ changeJobVertexParallelism(jobVertex, parallelism);
+ } else {
+ checkState(parallelism == jobVertex.getParallelism());
+ }
+ checkState(canInitialize(jobVertex));
+ getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp,
jobVertexInputInfos);
+ if (!isRecovering()) {
+ jobRecoveryHandler.notifyExecutionJobVertexInitialization(
+ jobVertex.getJobVertex().getID(), parallelism,
jobVertexInputInfos);
+ }
+ }
+
+ private void resetVerticesInRecovering(Set<ExecutionVertexID>
verticesToReset)
+ throws Exception {
+ for (ExecutionVertexID executionVertexID : verticesToReset) {
+ notifyCoordinatorsAboutTaskFailure(
+
getExecutionVertex(executionVertexID).getCurrentExecutionAttempt(), null);
+ }
+ resetForNewExecutions(verticesToReset);
+ restoreState(verticesToReset, false);
+ }
+ private void onRecoveringFinished(Set<JobVertexID>
requiredRestartJobVertices) {
Review Comment:
Is is possible to avoid depending on the handler to offer this
`requiredRestartJobVertices`, but instead continue the scheduling with the
current topology status?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java:
##########
@@ -146,6 +146,29 @@ private void maybeScheduleVertices(final
Set<ExecutionVertexID> vertices) {
scheduledVertices.addAll(verticesToSchedule);
}
+ @Override
+ public void scheduleVerticesAfterRecovering() {
+ final Set<ExecutionVertexID> verticesToSchedule = new HashSet<>();
+
+ Set<ExecutionVertexID> nextVertices =
+ newVertices.stream()
Review Comment:
What if the graph was created in a static way?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryHandler.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.QuadConsumerWithException;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/** Interface for handling batch job recovery. */
+public interface BatchJobRecoveryHandler {
+
+ /**
+ * Stops the job recovery handler and optionally clears up.
+ *
+ * @param clearUp whether to clear up.
+ */
+ void stop(boolean clearUp);
+
+ /**
+ * Starts the recovery process and sets up listeners for recovery
completion or failure.
+ *
+ * @param recoverFinishedListener a listener called in case recovery
finished.
+ * @param recoverFailedListener a runnable called in case recovery fails.
+ */
+ void startRecovering(
+ Consumer<Set<JobVertexID>> recoverFinishedListener, Runnable
recoverFailedListener);
+
+ /** Determines whether the job needs to undergo recovery. */
+ boolean needRecover();
+
+ /** Determines whether the job is recovering. */
+ boolean isRecovering();
+
+ /**
+ * Notifies that a set of execution vertices have been reset.
+ *
+ * @param vertices a collection of execution vertex IDs that have been
reset.
+ */
+ void notifyExecutionVertexReset(Collection<ExecutionVertexID> vertices);
+
+ /**
+ * Notifies the initialization of a job vertex.
+ *
+ * @param jobVertexId the ID of the job vertex being initialized.
+ * @param parallelism the parallelism degree of the job vertex.
+ * @param jobVertexInputInfos a map of intermediate dataset IDs to job
vertex input info.
+ */
+ void notifyExecutionJobVertexInitialization(
+ JobVertexID jobVertexId,
+ int parallelism,
+ Map<IntermediateDataSetID, JobVertexInputInfo>
jobVertexInputInfos);
+
+ /** Notifies that an execution has finished. */
+ void notifyExecutionFinished(
+ ExecutionVertexID executionVertexId, TaskExecutionStateTransition
taskExecutionState);
+
+ /** Initializes the recovery handler with the necessary components. */
+ void initialize(
+ Logger log,
+ ExecutionGraph executionGraph,
+ ShuffleMaster<?> shuffleMaster,
Review Comment:
It's better to introduce a context to host all the methods needed for
recovery.
This can help the needed method to be well documented instead using
`Consumer/QuadConsumerWithException/Runnable/...`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -479,7 +640,13 @@ public void initializeVerticesIfPossible() {
// ExecutionGraph#initializeJobVertex(ExecutionJobVertex,
long) to initialize.
// TODO: In the future, if we want to load balance for job
vertices whose
// parallelism has already been decided, we need to
refactor the logic here.
Review Comment:
The comment is outdated.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler
createSpeculativeExecutionHandler(
protected void startSchedulingInternal() {
speculativeExecutionHandler.init(
getExecutionGraph(), getMainThreadExecutor(),
jobManagerJobMetricGroup);
+ jobRecoveryHandler.initialize(
+ log,
+ getExecutionGraph(),
+ shuffleMaster,
+ getMainThreadExecutor(),
+ failoverStrategy,
+ this::failJob,
+ this::resetVerticesInRecovering,
+ this::updateResultPartitionBytesMetrics,
+ this::initializeJobVertex,
+ this::updateTopology);
+
+ if (jobRecoveryHandler.needRecover()) {
+ getMainThreadExecutor()
+ .schedule(
+ () ->
+ jobRecoveryHandler.startRecovering(
+ this::onRecoveringFinished,
this::onRecoveringFailed),
+ previousWorkerRecoveryTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+ } else {
+ tryComputeSourceParallelismThenRunAsync(
+ (Void value, Throwable throwable) -> {
+ if (getExecutionGraph().getState() ==
JobStatus.CREATED) {
+ initializeVerticesIfPossible();
+ super.startSchedulingInternal();
+ }
+ });
+ }
+ }
+
+ @Override
+ protected void maybeRestartTasks(final FailureHandlingResult
failureHandlingResult) {
Review Comment:
More comments are needed to describe what this method does and what it is
for.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyBatchJobRecoveryHandler.java:
##########
@@ -47,7 +47,7 @@
/** A dummy implementation of the {@link BatchJobRecoveryHandler}. */
public class DummyBatchJobRecoveryHandler implements BatchJobRecoveryHandler {
@Override
- public void stopJobEventManager(boolean clearEvents) {}
+ public void stop(boolean clearUp) {}
Review Comment:
clearUp -> cleanUp
They have different meanings.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler
createSpeculativeExecutionHandler(
protected void startSchedulingInternal() {
speculativeExecutionHandler.init(
getExecutionGraph(), getMainThreadExecutor(),
jobManagerJobMetricGroup);
+ jobRecoveryHandler.initialize(
+ log,
+ getExecutionGraph(),
+ shuffleMaster,
+ getMainThreadExecutor(),
+ failoverStrategy,
+ this::failJob,
+ this::resetVerticesInRecovering,
+ this::updateResultPartitionBytesMetrics,
+ this::initializeJobVertex,
+ this::updateTopology);
+
+ if (jobRecoveryHandler.needRecover()) {
+ getMainThreadExecutor()
+ .schedule(
+ () ->
+ jobRecoveryHandler.startRecovering(
+ this::onRecoveringFinished,
this::onRecoveringFailed),
+ previousWorkerRecoveryTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+ } else {
+ tryComputeSourceParallelismThenRunAsync(
+ (Void value, Throwable throwable) -> {
+ if (getExecutionGraph().getState() ==
JobStatus.CREATED) {
+ initializeVerticesIfPossible();
+ super.startSchedulingInternal();
+ }
+ });
+ }
+ }
+
+ @Override
+ protected void maybeRestartTasks(final FailureHandlingResult
failureHandlingResult) {
+ FailureHandlingResult wrappedResult = failureHandlingResult;
+ if (failureHandlingResult.canRestart()) {
+ Set<ExecutionVertexID> originalNeedToRestartVertices =
+ failureHandlingResult.getVerticesToRestart();
+
+ Set<JobVertexID> extraNeedToRestartJobVertices =
+ originalNeedToRestartVertices.stream()
+ .map(ExecutionVertexID::getJobVertexId)
+ .filter(requiredRestartJobVertices::contains)
+ .collect(Collectors.toSet());
+
+
requiredRestartJobVertices.removeAll(extraNeedToRestartJobVertices);
+
+ Set<ExecutionVertexID> needToRestartVertices =
+ extraNeedToRestartJobVertices.stream()
+ .flatMap(
+ jobVertexId -> {
+ ExecutionJobVertex jobVertex =
+
getExecutionJobVertex(jobVertexId);
+ return
Arrays.stream(jobVertex.getTaskVertices())
+ .map(ExecutionVertex::getID);
+ })
+ .collect(Collectors.toSet());
+ needToRestartVertices.addAll(originalNeedToRestartVertices);
+
+ wrappedResult =
+ FailureHandlingResult.restartable(
+
failureHandlingResult.getFailedExecution().orElse(null),
+ failureHandlingResult.getError(),
+ failureHandlingResult.getTimestamp(),
+ failureHandlingResult.getFailureLabels(),
+ needToRestartVertices,
+ failureHandlingResult.getRestartDelayMS(),
+ failureHandlingResult.isGlobalFailure(),
+ failureHandlingResult.isRootCause());
+ }
+
+ super.maybeRestartTasks(wrappedResult);
+ }
+
+ @VisibleForTesting
+ boolean isRecovering() {
+ return jobRecoveryHandler.isRecovering();
+ }
+
+ @Override
+ public boolean updateTaskExecutionState(final TaskExecutionStateTransition
taskExecutionState) {
+ boolean success = super.updateTaskExecutionState(taskExecutionState);
+
+ if (success
+ && taskExecutionState.getExecutionState() ==
ExecutionState.FINISHED
+ && !isRecovering()) {
+ final ExecutionVertexID executionVertexId =
+ taskExecutionState.getID().getExecutionVertexId();
+ jobRecoveryHandler.notifyExecutionFinished(executionVertexId,
taskExecutionState);
Review Comment:
How about to do it in `onTaskFinished()`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryHandler.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.QuadConsumerWithException;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/** Interface for handling batch job recovery. */
+public interface BatchJobRecoveryHandler {
+
+ /**
+ * Stops the job recovery handler and optionally clears up.
+ *
+ * @param clearUp whether to clear up.
+ */
+ void stop(boolean clearUp);
Review Comment:
It's better to reorganization the order of these methods to make the code
easier for reading.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java:
##########
@@ -55,4 +55,9 @@ public interface SchedulingStrategy {
* @param resultPartitionId The id of the result partition
*/
void onPartitionConsumable(IntermediateResultPartitionID
resultPartitionId);
+
+ /** Called when the batch job recovery is finished. */
+ default void scheduleVerticesAfterRecovering() {
Review Comment:
I prefer to name it as `scheduleAllVerticesIfPossible`. And comment that it
schedules all vertices except for finished vertices and vertices whose inputs
are not ready.
Job recovery just leverages its capability to resume job scheduling.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler
createSpeculativeExecutionHandler(
protected void startSchedulingInternal() {
speculativeExecutionHandler.init(
getExecutionGraph(), getMainThreadExecutor(),
jobManagerJobMetricGroup);
+ jobRecoveryHandler.initialize(
Review Comment:
Is it possible to organize the logic like this:
```
if (!maybeRecoverFromPreviousJobAttempt()) {
... start new scheduling ...
}
```
And maybe combine `startRecovering()` and `initialize()` into one method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]