wanglijie95 commented on code in PR #20222:
URL: https://github.com/apache/flink/pull/20222#discussion_r919645557
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -324,13 +332,24 @@ private void restartTasksWithDelay(final
FailureHandlingResult failureHandlingRe
.values());
final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
+ if (globalRecovery) {
+ log.info(
+ "{} tasks will be restarted to recover from a global
failure.",
+ verticesToRestart.size());
+ } else {
+
checkArgument(failureHandlingResult.getFailedExecution().isPresent());
+ log.info(
+ "{} tasks will be restarted to recover the failed task
{}.",
Review Comment:
The printed task id was changed from `ExecutionVertexID` to
`ExecutionAttemptID`. But I think it is more reasonable than before.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -220,46 +217,57 @@ protected void startSchedulingInternal() {
}
@Override
- protected void updateTaskExecutionStateInternal(
- final ExecutionVertexID executionVertexId,
- final TaskExecutionStateTransition taskExecutionState) {
+ protected void onTaskExecutionStateUpdate(final Execution execution) {
+ switch (execution.getState()) {
+ case FINISHED:
+ onTaskFinished(execution);
+ break;
+ case FAILED:
+ onTaskFailed(execution);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "State %s should not be notified to
DefaultScheduler.",
+ execution.getState()));
+ }
+ }
+ protected void onTaskFinished(final Execution execution) {
+ checkState(execution.getState() == ExecutionState.FINISHED);
+
+ final ExecutionVertexID executionVertexId =
execution.getVertex().getID();
// once a task finishes, it will release the assigned allocation/slot
and no longer
// needs it. Therefore, it should stop reserving the slot so that
other tasks are
// possible to use the slot. Ideally, the `stopReserveAllocation`
should happen
// along with the release slot process. However, that process is
hidden in the depth
// of the ExecutionGraph, so we currently do it in DefaultScheduler
after that process
// is done.
- if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED)
{
- stopReserveAllocation(executionVertexId);
- }
+ stopReserveAllocation(executionVertexId);
- schedulingStrategy.onExecutionStateChange(
- executionVertexId, taskExecutionState.getExecutionState());
- maybeHandleTaskFailure(taskExecutionState,
getCurrentExecutionOfVertex(executionVertexId));
+ schedulingStrategy.onExecutionStateChange(executionVertexId,
ExecutionState.FINISHED);
Review Comment:
The `SchedulingStrategy#onExecutionStateChange` is only called when a task
finishes currently, how about change it to `onExecutionFinish` ? Or call it for
all state changes?
Otherwise, it may confuse subsequent developers.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java:
##########
@@ -85,6 +85,10 @@ public Collection<Execution> getCurrentExecutions() {
return Collections.unmodifiableCollection(currentExecutions.values());
}
+ public Execution getCurrentExecutionOrThrow(final ExecutionAttemptID
attemptId) {
Review Comment:
never be used
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import
org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import
org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.topology.Vertex;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+ implements SlowTaskDetectorListener {
+
+ private final int maxConcurrentExecutions;
+
+ private final Duration blockSlowNodeDuration;
+
+ private final BlocklistHandler blocklistHandler;
+
+ private final SlowTaskDetector slowTaskDetector;
+
+ public SpeculativeScheduler(
+ final Logger log,
+ final JobGraph jobGraph,
+ final Executor ioExecutor,
+ final Configuration jobMasterConfiguration,
+ final Consumer<ComponentMainThreadExecutor> startUpAction,
+ final ScheduledExecutor delayExecutor,
+ final ClassLoader userCodeLoader,
+ final CheckpointsCleaner checkpointsCleaner,
+ final CheckpointRecoveryFactory checkpointRecoveryFactory,
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final FailoverStrategy.Factory failoverStrategyFactory,
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ExecutionOperations executionOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+ long initializationTimestamp,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final JobStatusListener jobStatusListener,
+ final ExecutionGraphFactory executionGraphFactory,
+ final ShuffleMaster<?> shuffleMaster,
+ final Time rpcTimeout,
+ final VertexParallelismDecider vertexParallelismDecider,
+ final int defaultMaxParallelism,
+ final BlocklistHandler blocklistHandler)
+ throws Exception {
+
+ super(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ startUpAction,
+ delayExecutor,
+ userCodeLoader,
+ checkpointsCleaner,
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory,
+ initializationTimestamp,
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ vertexParallelismDecider,
+ defaultMaxParallelism);
+
+ this.maxConcurrentExecutions =
+ jobMasterConfiguration.getInteger(
+
JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+ this.blockSlowNodeDuration =
+
jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+ this.blocklistHandler = checkNotNull(blocklistHandler);
+
+ this.slowTaskDetector = new
ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+ }
+
+ @Override
+ protected void startSchedulingInternal() {
+ super.startSchedulingInternal();
+ slowTaskDetector.start(getExecutionGraph(), this,
getMainThreadExecutor());
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> future = super.closeAsync();
+ slowTaskDetector.stop();
+ return future;
+ }
+
+ @Override
+ public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID
executionVertexId) {
+ return (SpeculativeExecutionVertex)
super.getExecutionVertex(executionVertexId);
+ }
+
+ @Override
+ protected void onTaskFinished(final Execution execution) {
+ // cancel all un-terminated executions because the execution vertex
has finished
+
FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+ initializeVerticesIfPossible();
Review Comment:
`initializeVerticesIfPossible` is not needed becuase it will be called in
`super.onTaskFinished(execution)`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1784,7 +1784,7 @@ private static JobGraph sourceSinkJobGraph(final int
parallelism) {
return JobGraphTestUtils.streamingJobGraph(source, sink);
}
- private static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {
+ public static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {
Review Comment:
This change is not needed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java:
##########
@@ -83,6 +83,15 @@ public Set<ExecutionVertexID> getUnmodifiedExecutionVertices(
.collect(Collectors.toSet());
}
+ public Map<ExecutionVertexID, ExecutionVertexVersion>
getExecutionVertexVersions(
Review Comment:
What's the difference between this method and `recordVertexModifications` ?
Why do we need this?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -308,7 +308,7 @@ public int getParallelism() {
* @param parallelism The parallelism for the task.
*/
public void setParallelism(int parallelism) {
- if (parallelism < 1) {
+ if (parallelism < 1 && parallelism !=
ExecutionConfig.PARALLELISM_DEFAULT) {
Review Comment:
Will we pass `ExecutionConfig.PARALLELISM_DEFAULT` here ?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java:
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import
org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
+import
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import
org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static
org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
+
+/** A builder to create {@link DefaultScheduler} instances for testing. */
+public class DefaultSchedulerBuilder {
Review Comment:
How about renaming it to `SchedulerBuilder` and provide
`buildDefaultScheduler` and `buildAdaptiveBatchScheduler` ?
Comments should also be updated.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -324,13 +332,24 @@ private void restartTasksWithDelay(final
FailureHandlingResult failureHandlingRe
.values());
final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
+ if (globalRecovery) {
Review Comment:
I think the changes of restart log should be a separate commit(including
changes in `RestartPipelinedRegionFailoverStrategy`).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -385,7 +386,8 @@ private SchedulerNG createScheduler(
initializationTimestamp,
getMainThreadExecutor(),
fatalErrorHandler,
- jobStatusListener);
+ jobStatusListener,
+ new NoOpBlocklistHandler());
Review Comment:
The `JobMaster#blocklistHandler` should be passed here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -746,9 +741,7 @@ private boolean isNotifiable(
return false;
}
- protected void updateTaskExecutionStateInternal(
- final ExecutionVertexID executionVertexId,
- final TaskExecutionStateTransition taskExecutionState) {}
+ protected abstract void onTaskExecutionStateUpdate(final Execution
execution);
Review Comment:
How about move the implementation here and provide two abstract method
`onTaskFinished` and `onTaskFailed` ?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import
org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import
org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.topology.Vertex;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+ implements SlowTaskDetectorListener {
+
+ private final int maxConcurrentExecutions;
+
+ private final Duration blockSlowNodeDuration;
+
+ private final BlocklistHandler blocklistHandler;
+
+ private final SlowTaskDetector slowTaskDetector;
+
+ public SpeculativeScheduler(
+ final Logger log,
+ final JobGraph jobGraph,
+ final Executor ioExecutor,
+ final Configuration jobMasterConfiguration,
+ final Consumer<ComponentMainThreadExecutor> startUpAction,
+ final ScheduledExecutor delayExecutor,
+ final ClassLoader userCodeLoader,
+ final CheckpointsCleaner checkpointsCleaner,
+ final CheckpointRecoveryFactory checkpointRecoveryFactory,
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final FailoverStrategy.Factory failoverStrategyFactory,
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ExecutionOperations executionOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+ long initializationTimestamp,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final JobStatusListener jobStatusListener,
+ final ExecutionGraphFactory executionGraphFactory,
+ final ShuffleMaster<?> shuffleMaster,
+ final Time rpcTimeout,
+ final VertexParallelismDecider vertexParallelismDecider,
+ final int defaultMaxParallelism,
+ final BlocklistHandler blocklistHandler)
+ throws Exception {
+
+ super(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ startUpAction,
+ delayExecutor,
+ userCodeLoader,
+ checkpointsCleaner,
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory,
+ initializationTimestamp,
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ vertexParallelismDecider,
+ defaultMaxParallelism);
+
+ this.maxConcurrentExecutions =
+ jobMasterConfiguration.getInteger(
+
JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+ this.blockSlowNodeDuration =
+
jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+ this.blocklistHandler = checkNotNull(blocklistHandler);
+
+ this.slowTaskDetector = new
ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+ }
+
+ @Override
+ protected void startSchedulingInternal() {
+ super.startSchedulingInternal();
+ slowTaskDetector.start(getExecutionGraph(), this,
getMainThreadExecutor());
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> future = super.closeAsync();
+ slowTaskDetector.stop();
+ return future;
+ }
+
+ @Override
+ public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID
executionVertexId) {
+ return (SpeculativeExecutionVertex)
super.getExecutionVertex(executionVertexId);
+ }
+
+ @Override
+ protected void onTaskFinished(final Execution execution) {
+ // cancel all un-terminated executions because the execution vertex
has finished
+
FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+ initializeVerticesIfPossible();
+
+ super.onTaskFinished(execution);
+ }
+
+ private CompletableFuture<?> cancelPendingExecutions(
+ final ExecutionVertexID executionVertexId) {
+ final SpeculativeExecutionVertex executionVertex =
getExecutionVertex(executionVertexId);
+
+ final List<CompletableFuture<?>> cancelingFutures = new ArrayList<>();
+ for (Execution execution : executionVertex.getCurrentExecutions()) {
+ if (!execution.getState().isTerminal()) {
+ execution.cancel();
+ cancelingFutures.add(execution.getReleaseFuture());
+ }
+ }
+ cancelAllPendingSlotRequests(executionVertexId);
+ return FutureUtils.combineAll(cancelingFutures);
+ }
+
+ @Override
+ protected void onTaskFailed(final Execution execution) {
+ final SpeculativeExecutionVertex executionVertex =
+ getExecutionVertex(execution.getVertex().getID());
+ final ExecutionAttemptID attemptId = execution.getAttemptId();
+
+ // when an execution fails, remove it from current executions to make
room for future
+ // speculative executions
+ executionVertex.archiveFailedExecution(attemptId);
+ executionSlotAllocator.cancel(attemptId);
+
+ super.onTaskFailed(execution);
+ }
+
+ @Override
+ protected void handleTaskFailure(
+ final Execution failedExecution, @Nullable final Throwable error) {
+
+ final SpeculativeExecutionVertex executionVertex =
+ getExecutionVertex(failedExecution.getVertex().getID());
+
+ // if the execution vertex is not possible finish or a
PartitionException occurred, trigger
+ // an execution vertex failover to recover
+ if (!isExecutionVertexPossibleToFinish(executionVertex)
+ || ExceptionUtils.findThrowable(error,
PartitionException.class).isPresent()) {
+ super.handleTaskFailure(failedExecution, error);
+ } else {
+ // add the execution failure to exception history even though not
restarting the entire
+ // execution vertex
+ final long timestamp = System.currentTimeMillis();
+ setGlobalFailureCause(error, timestamp);
+ final FailureHandlingResult failureHandlingResult =
+ executionFailureHandler.getFailureHandlingResult(
+ failedExecution, error, timestamp);
+ if (failureHandlingResult.canRestart()) {
+ archiveFromFailureHandlingResult(
+
createFailureHandlingResultSnapshot(failureHandlingResult));
+ } else {
+ failJob(error, timestamp);
+ }
+ }
+ }
+
+ private static boolean isExecutionVertexPossibleToFinish(
+ final SpeculativeExecutionVertex executionVertex) {
+ boolean anyExecutionPossibleToFinish = false;
+ for (Execution execution : executionVertex.getCurrentExecutions()) {
+ // if any execution has finished, no execution of the same
execution vertex should fail
+ // after that
+ checkState(execution.getState() != ExecutionState.FINISHED);
+
+ if (execution.getState() == ExecutionState.CREATED
+ || execution.getState() == ExecutionState.SCHEDULED
+ || execution.getState() == ExecutionState.DEPLOYING
+ || execution.getState() == ExecutionState.INITIALIZING
+ || execution.getState() == ExecutionState.RUNNING) {
+ anyExecutionPossibleToFinish = true;
+ }
+ }
+ return anyExecutionPossibleToFinish;
+ }
+
+ @Override
+ protected void cancelAllPendingSlotRequestsInternal() {
Review Comment:
`cancelAllPendingSlotRequestsInternal`
`cancelAllPendingSlotRequestsForVertices` and `cancelAllPendingSlotRequests` is
not needed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import
org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import
org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.topology.Vertex;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+ implements SlowTaskDetectorListener {
+
+ private final int maxConcurrentExecutions;
+
+ private final Duration blockSlowNodeDuration;
+
+ private final BlocklistHandler blocklistHandler;
+
+ private final SlowTaskDetector slowTaskDetector;
+
+ public SpeculativeScheduler(
+ final Logger log,
+ final JobGraph jobGraph,
+ final Executor ioExecutor,
+ final Configuration jobMasterConfiguration,
+ final Consumer<ComponentMainThreadExecutor> startUpAction,
+ final ScheduledExecutor delayExecutor,
+ final ClassLoader userCodeLoader,
+ final CheckpointsCleaner checkpointsCleaner,
+ final CheckpointRecoveryFactory checkpointRecoveryFactory,
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final FailoverStrategy.Factory failoverStrategyFactory,
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ExecutionOperations executionOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+ long initializationTimestamp,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final JobStatusListener jobStatusListener,
+ final ExecutionGraphFactory executionGraphFactory,
+ final ShuffleMaster<?> shuffleMaster,
+ final Time rpcTimeout,
+ final VertexParallelismDecider vertexParallelismDecider,
+ final int defaultMaxParallelism,
+ final BlocklistHandler blocklistHandler)
+ throws Exception {
+
+ super(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ startUpAction,
+ delayExecutor,
+ userCodeLoader,
+ checkpointsCleaner,
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory,
+ initializationTimestamp,
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ vertexParallelismDecider,
+ defaultMaxParallelism);
+
+ this.maxConcurrentExecutions =
+ jobMasterConfiguration.getInteger(
+
JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+ this.blockSlowNodeDuration =
+
jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+ this.blocklistHandler = checkNotNull(blocklistHandler);
+
+ this.slowTaskDetector = new
ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+ }
+
+ @Override
+ protected void startSchedulingInternal() {
+ super.startSchedulingInternal();
+ slowTaskDetector.start(getExecutionGraph(), this,
getMainThreadExecutor());
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> future = super.closeAsync();
+ slowTaskDetector.stop();
+ return future;
+ }
+
+ @Override
+ public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID
executionVertexId) {
+ return (SpeculativeExecutionVertex)
super.getExecutionVertex(executionVertexId);
+ }
+
+ @Override
+ protected void onTaskFinished(final Execution execution) {
+ // cancel all un-terminated executions because the execution vertex
has finished
+
FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+ initializeVerticesIfPossible();
+
+ super.onTaskFinished(execution);
+ }
+
+ private CompletableFuture<?> cancelPendingExecutions(
+ final ExecutionVertexID executionVertexId) {
+ final SpeculativeExecutionVertex executionVertex =
getExecutionVertex(executionVertexId);
+
+ final List<CompletableFuture<?>> cancelingFutures = new ArrayList<>();
+ for (Execution execution : executionVertex.getCurrentExecutions()) {
+ if (!execution.getState().isTerminal()) {
+ execution.cancel();
+ cancelingFutures.add(execution.getReleaseFuture());
+ }
+ }
+ cancelAllPendingSlotRequests(executionVertexId);
+ return FutureUtils.combineAll(cancelingFutures);
+ }
+
+ @Override
+ protected void onTaskFailed(final Execution execution) {
+ final SpeculativeExecutionVertex executionVertex =
+ getExecutionVertex(execution.getVertex().getID());
+ final ExecutionAttemptID attemptId = execution.getAttemptId();
+
+ // when an execution fails, remove it from current executions to make
room for future
+ // speculative executions
+ executionVertex.archiveFailedExecution(attemptId);
+ executionSlotAllocator.cancel(attemptId);
+
+ super.onTaskFailed(execution);
+ }
+
+ @Override
+ protected void handleTaskFailure(
+ final Execution failedExecution, @Nullable final Throwable error) {
+
+ final SpeculativeExecutionVertex executionVertex =
+ getExecutionVertex(failedExecution.getVertex().getID());
+
+ // if the execution vertex is not possible finish or a
PartitionException occurred, trigger
+ // an execution vertex failover to recover
+ if (!isExecutionVertexPossibleToFinish(executionVertex)
+ || ExceptionUtils.findThrowable(error,
PartitionException.class).isPresent()) {
+ super.handleTaskFailure(failedExecution, error);
+ } else {
+ // add the execution failure to exception history even though not
restarting the entire
+ // execution vertex
+ final long timestamp = System.currentTimeMillis();
+ setGlobalFailureCause(error, timestamp);
+ final FailureHandlingResult failureHandlingResult =
+ executionFailureHandler.getFailureHandlingResult(
+ failedExecution, error, timestamp);
+ if (failureHandlingResult.canRestart()) {
+ archiveFromFailureHandlingResult(
+
createFailureHandlingResultSnapshot(failureHandlingResult));
+ } else {
+ failJob(error, timestamp);
+ }
+ }
+ }
+
+ private static boolean isExecutionVertexPossibleToFinish(
+ final SpeculativeExecutionVertex executionVertex) {
+ boolean anyExecutionPossibleToFinish = false;
+ for (Execution execution : executionVertex.getCurrentExecutions()) {
+ // if any execution has finished, no execution of the same
execution vertex should fail
+ // after that
+ checkState(execution.getState() != ExecutionState.FINISHED);
+
+ if (execution.getState() == ExecutionState.CREATED
+ || execution.getState() == ExecutionState.SCHEDULED
+ || execution.getState() == ExecutionState.DEPLOYING
+ || execution.getState() == ExecutionState.INITIALIZING
+ || execution.getState() == ExecutionState.RUNNING) {
+ anyExecutionPossibleToFinish = true;
+ }
+ }
+ return anyExecutionPossibleToFinish;
+ }
+
+ @Override
+ protected void cancelAllPendingSlotRequestsInternal() {
+ IterableUtils.toStream(getSchedulingTopology().getVertices())
+ .map(Vertex::getId)
+ .forEach(this::cancelAllPendingSlotRequests);
+ }
+
+ @Override
+ protected void cancelAllPendingSlotRequestsForVertices(
+ final Set<ExecutionVertexID> executionVertices) {
+ executionVertices.forEach(this::cancelAllPendingSlotRequests);
+ }
+
+ private void cancelAllPendingSlotRequests(final ExecutionVertexID
executionVertexId) {
+ final SpeculativeExecutionVertex executionVertex =
getExecutionVertex(executionVertexId);
+ executionVertex
+ .getCurrentExecutions()
+ .forEach(e -> executionSlotAllocator.cancel(e.getAttemptId()));
+ }
+
+ @Override
+ public void notifySlowTasks(Map<ExecutionVertexID,
Collection<ExecutionAttemptID>> slowTasks) {
+ // add slow nodes to blocklist before scheduling new speculative
executions
+ final long blockedEndTimestamp =
+ System.currentTimeMillis() + blockSlowNodeDuration.toMillis();
+ final Collection<BlockedNode> nodesToBlock =
+ getSlowNodeIds(slowTasks).stream()
+ .map(
+ nodeId ->
+ new BlockedNode(
+ nodeId,
+ "Node is detected to be slow.",
+ blockedEndTimestamp))
+ .collect(Collectors.toList());
+ blocklistHandler.addNewBlockedNodes(nodesToBlock);
+
+ final List<Execution> newSpeculativeExecutions = new ArrayList<>();
+ final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
+ for (ExecutionVertexID executionVertexId : slowTasks.keySet()) {
+ final SpeculativeExecutionVertex executionVertex =
+ getExecutionVertex(executionVertexId);
+
+ if (executionVertex.containsSources() ||
executionVertex.containsSinks()) {
+ continue;
+ }
+
+ final int currentConcurrentExecutions =
executionVertex.getCurrentExecutions().size();
+ final int newSpeculativeExecutionsToDeploy =
+ maxConcurrentExecutions - currentConcurrentExecutions;
+ if (newSpeculativeExecutionsToDeploy > 0) {
+ log.info(
+ "{} ({}) is detected as a slow vertex, create and
deploy {} new speculative executions for it.",
Review Comment:
Mayebe also print number of current concurrent executions here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import
org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import
org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.topology.Vertex;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+ implements SlowTaskDetectorListener {
+
+ private final int maxConcurrentExecutions;
+
+ private final Duration blockSlowNodeDuration;
+
+ private final BlocklistHandler blocklistHandler;
+
+ private final SlowTaskDetector slowTaskDetector;
+
+ public SpeculativeScheduler(
+ final Logger log,
+ final JobGraph jobGraph,
+ final Executor ioExecutor,
+ final Configuration jobMasterConfiguration,
+ final Consumer<ComponentMainThreadExecutor> startUpAction,
+ final ScheduledExecutor delayExecutor,
+ final ClassLoader userCodeLoader,
+ final CheckpointsCleaner checkpointsCleaner,
+ final CheckpointRecoveryFactory checkpointRecoveryFactory,
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final FailoverStrategy.Factory failoverStrategyFactory,
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ExecutionOperations executionOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+ long initializationTimestamp,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final JobStatusListener jobStatusListener,
+ final ExecutionGraphFactory executionGraphFactory,
+ final ShuffleMaster<?> shuffleMaster,
+ final Time rpcTimeout,
+ final VertexParallelismDecider vertexParallelismDecider,
+ final int defaultMaxParallelism,
+ final BlocklistHandler blocklistHandler)
+ throws Exception {
+
+ super(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ startUpAction,
+ delayExecutor,
+ userCodeLoader,
+ checkpointsCleaner,
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory,
+ initializationTimestamp,
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ vertexParallelismDecider,
+ defaultMaxParallelism);
+
+ this.maxConcurrentExecutions =
+ jobMasterConfiguration.getInteger(
+
JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+ this.blockSlowNodeDuration =
+
jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+ this.blocklistHandler = checkNotNull(blocklistHandler);
+
+ this.slowTaskDetector = new
ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+ }
+
+ @Override
+ protected void startSchedulingInternal() {
+ super.startSchedulingInternal();
+ slowTaskDetector.start(getExecutionGraph(), this,
getMainThreadExecutor());
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> future = super.closeAsync();
+ slowTaskDetector.stop();
+ return future;
+ }
+
+ @Override
+ public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID
executionVertexId) {
+ return (SpeculativeExecutionVertex)
super.getExecutionVertex(executionVertexId);
+ }
+
+ @Override
+ protected void onTaskFinished(final Execution execution) {
+ // cancel all un-terminated executions because the execution vertex
has finished
+
FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+ initializeVerticesIfPossible();
+
+ super.onTaskFinished(execution);
+ }
+
+ private CompletableFuture<?> cancelPendingExecutions(
+ final ExecutionVertexID executionVertexId) {
+ final SpeculativeExecutionVertex executionVertex =
getExecutionVertex(executionVertexId);
+
+ final List<CompletableFuture<?>> cancelingFutures = new ArrayList<>();
+ for (Execution execution : executionVertex.getCurrentExecutions()) {
+ if (!execution.getState().isTerminal()) {
+ execution.cancel();
+ cancelingFutures.add(execution.getReleaseFuture());
+ }
+ }
+ cancelAllPendingSlotRequests(executionVertexId);
+ return FutureUtils.combineAll(cancelingFutures);
+ }
+
+ @Override
+ protected void onTaskFailed(final Execution execution) {
+ final SpeculativeExecutionVertex executionVertex =
+ getExecutionVertex(execution.getVertex().getID());
+ final ExecutionAttemptID attemptId = execution.getAttemptId();
+
+ // when an execution fails, remove it from current executions to make
room for future
+ // speculative executions
+ executionVertex.archiveFailedExecution(attemptId);
+ executionSlotAllocator.cancel(attemptId);
+
+ super.onTaskFailed(execution);
+ }
+
+ @Override
+ protected void handleTaskFailure(
+ final Execution failedExecution, @Nullable final Throwable error) {
+
+ final SpeculativeExecutionVertex executionVertex =
+ getExecutionVertex(failedExecution.getVertex().getID());
+
+ // if the execution vertex is not possible finish or a
PartitionException occurred, trigger
+ // an execution vertex failover to recover
+ if (!isExecutionVertexPossibleToFinish(executionVertex)
+ || ExceptionUtils.findThrowable(error,
PartitionException.class).isPresent()) {
+ super.handleTaskFailure(failedExecution, error);
+ } else {
+ // add the execution failure to exception history even though not
restarting the entire
+ // execution vertex
+ final long timestamp = System.currentTimeMillis();
+ setGlobalFailureCause(error, timestamp);
+ final FailureHandlingResult failureHandlingResult =
+ executionFailureHandler.getFailureHandlingResult(
+ failedExecution, error, timestamp);
+ if (failureHandlingResult.canRestart()) {
+ archiveFromFailureHandlingResult(
+
createFailureHandlingResultSnapshot(failureHandlingResult));
+ } else {
+ failJob(error, timestamp);
+ }
+ }
+ }
+
+ private static boolean isExecutionVertexPossibleToFinish(
+ final SpeculativeExecutionVertex executionVertex) {
+ boolean anyExecutionPossibleToFinish = false;
+ for (Execution execution : executionVertex.getCurrentExecutions()) {
+ // if any execution has finished, no execution of the same
execution vertex should fail
+ // after that
+ checkState(execution.getState() != ExecutionState.FINISHED);
+
+ if (execution.getState() == ExecutionState.CREATED
+ || execution.getState() == ExecutionState.SCHEDULED
+ || execution.getState() == ExecutionState.DEPLOYING
+ || execution.getState() == ExecutionState.INITIALIZING
+ || execution.getState() == ExecutionState.RUNNING) {
+ anyExecutionPossibleToFinish = true;
+ }
+ }
+ return anyExecutionPossibleToFinish;
+ }
+
+ @Override
+ protected void cancelAllPendingSlotRequestsInternal() {
+ IterableUtils.toStream(getSchedulingTopology().getVertices())
+ .map(Vertex::getId)
+ .forEach(this::cancelAllPendingSlotRequests);
+ }
+
+ @Override
+ protected void cancelAllPendingSlotRequestsForVertices(
+ final Set<ExecutionVertexID> executionVertices) {
+ executionVertices.forEach(this::cancelAllPendingSlotRequests);
+ }
+
+ private void cancelAllPendingSlotRequests(final ExecutionVertexID
executionVertexId) {
+ final SpeculativeExecutionVertex executionVertex =
getExecutionVertex(executionVertexId);
+ executionVertex
+ .getCurrentExecutions()
+ .forEach(e -> executionSlotAllocator.cancel(e.getAttemptId()));
+ }
+
+ @Override
+ public void notifySlowTasks(Map<ExecutionVertexID,
Collection<ExecutionAttemptID>> slowTasks) {
+ // add slow nodes to blocklist before scheduling new speculative
executions
+ final long blockedEndTimestamp =
+ System.currentTimeMillis() + blockSlowNodeDuration.toMillis();
+ final Collection<BlockedNode> nodesToBlock =
+ getSlowNodeIds(slowTasks).stream()
+ .map(
+ nodeId ->
+ new BlockedNode(
+ nodeId,
+ "Node is detected to be slow.",
+ blockedEndTimestamp))
+ .collect(Collectors.toList());
+ blocklistHandler.addNewBlockedNodes(nodesToBlock);
+
+ final List<Execution> newSpeculativeExecutions = new ArrayList<>();
+ final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
+ for (ExecutionVertexID executionVertexId : slowTasks.keySet()) {
+ final SpeculativeExecutionVertex executionVertex =
+ getExecutionVertex(executionVertexId);
+
+ if (executionVertex.containsSources() ||
executionVertex.containsSinks()) {
Review Comment:
I think we will change the logic here after supporting the source?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java:
##########
@@ -221,24 +204,26 @@ public void testReturningLogicalSlotsRemovesSharedSlot()
throws Exception {
}
@Test
- public void
testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws
Exception {
+ void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot()
throws Exception {
// physical slot request is not completed and does not complete
logical requests
testLogicalSlotRequestCancellationOrRelease(
true,
true,
(context, assignment) -> {
context.getAllocator().cancel(assignment.getExecutionAttemptId());
- try {
- assignment.getLogicalSlotFuture().get();
- fail("The logical future must finish with the
cancellation exception");
- } catch (InterruptedException | ExecutionException e) {
- assertThat(e.getCause(),
instanceOf(CancellationException.class));
- }
+ assertThatThrownBy(
+ () -> {
+ context.getAllocator()
+
.cancel(assignment.getExecutionAttemptId());
+
assignment.getLogicalSlotFuture().get();
+ })
+ .as("The logical future must finish with the
cancellation exception.")
+ .hasCauseInstanceOf(CancellationException.class);
Review Comment:
Maybe `hasRootCauseInstanceOf` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]