This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b96d476d4cd05de1e6c0b001f2477aaed98bd473 Author: Zhu Zhu <[email protected]> AuthorDate: Thu Jun 23 20:38:38 2022 +0800 [FLINK-28134][runtime] Introduce SpeculativeExecutionJobVertex and SpeculativeExecutionVertex This closes #20082. --- .../ArchivedSpeculativeExecutionVertex.java | 52 ++++ .../executiongraph/DefaultExecutionGraph.java | 11 +- .../DefaultExecutionGraphBuilder.java | 6 +- .../runtime/executiongraph/ExecutionJobVertex.java | 31 ++- .../runtime/executiongraph/ExecutionVertex.java | 135 +++++----- .../SpeculativeExecutionJobVertex.java | 67 +++++ .../executiongraph/SpeculativeExecutionVertex.java | 276 +++++++++++++++++++++ .../scheduler/DefaultExecutionGraphFactory.java | 14 +- .../AdaptiveBatchSchedulerFactory.java | 4 +- .../SpeculativeExecutionVertexTest.java | 226 +++++++++++++++++ .../TestingDefaultExecutionGraphBuilder.java | 10 +- .../scheduler/DefaultExecutionDeployerTest.java | 13 +- .../scheduler/TestingInternalFailuresListener.java | 47 ++++ .../AdaptiveBatchSchedulerTestUtils.java | 4 +- 14 files changed, 811 insertions(+), 85 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java new file mode 100644 index 00000000000..263049414c5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +/** + * {@link ArchivedSpeculativeExecutionVertex} is a readonly representation of {@link + * SpeculativeExecutionVertex}. + */ +public class ArchivedSpeculativeExecutionVertex extends ArchivedExecutionVertex { + + private static final long serialVersionUID = 1L; + + public ArchivedSpeculativeExecutionVertex(SpeculativeExecutionVertex vertex) { + super( + vertex.getParallelSubtaskIndex(), + vertex.getTaskNameWithSubtaskIndex(), + vertex.getCurrentExecutionAttempt().archive(), + getCopyOfExecutionHistory(vertex)); + } + + private static ExecutionHistory getCopyOfExecutionHistory(SpeculativeExecutionVertex vertex) { + final ExecutionHistory executionHistory = + ArchivedExecutionVertex.getCopyOfExecutionHistory(vertex); + + // add all the executions to the execution history except for the only admitted current + // execution + final Execution currentAttempt = vertex.getCurrentExecutionAttempt(); + for (Execution execution : vertex.getCurrentExecutions()) { + if (execution.getAttemptNumber() != currentAttempt.getAttemptNumber()) { + executionHistory.add(execution.archive()); + } + } + + return executionHistory; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 40082ae1258..4e144050e7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -286,6 +286,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final boolean isDynamic; + private final ExecutionJobVertex.Factory executionJobVertexFactory; + // -------------------------------------------------------------------------------------------- // Constructors // -------------------------------------------------------------------------------------------- @@ -307,7 +309,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG long initializationTimestamp, VertexAttemptNumberStore initialAttemptCounts, VertexParallelismStore vertexParallelismStore, - boolean isDynamic) + boolean isDynamic, + ExecutionJobVertex.Factory executionJobVertexFactory) throws IOException { this.executionGraphId = new ExecutionGraphID(); @@ -375,6 +378,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG this.isDynamic = isDynamic; + this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory); + LOG.info( "Created execution graph {} for job {}.", executionGraphId, @@ -840,7 +845,9 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG parallelismStore.getParallelismInfo(jobVertex.getID()); // create the execution job vertex and attach it to the graph - ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, parallelismInfo); + ExecutionJobVertex ejv = + executionJobVertexFactory.createExecutionJobVertex( + this, jobVertex, parallelismInfo); ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); if (previousTask != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index 0d47f5164e2..3b521327bbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -91,7 +91,8 @@ public class DefaultExecutionGraphBuilder { VertexAttemptNumberStore vertexAttemptNumberStore, VertexParallelismStore vertexParallelismStore, Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory, - boolean isDynamicGraph) + boolean isDynamicGraph, + ExecutionJobVertex.Factory executionJobVertexFactory) throws JobExecutionException, JobException { checkNotNull(jobGraph, "job graph cannot be null"); @@ -136,7 +137,8 @@ public class DefaultExecutionGraphBuilder { initializationTimestamp, vertexAttemptNumberStore, vertexParallelismStore, - isDynamicGraph); + isDynamicGraph, + executionJobVertexFactory); } catch (IOException e) { throw new JobException("Could not create the ExecutionGraph.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index bc9dfd94bb5..7ac2156b5bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -188,7 +188,7 @@ public class ExecutionJobVertex // create all task vertices for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) { ExecutionVertex vertex = - new ExecutionVertex( + createExecutionVertex( this, i, producedDataSets, @@ -260,6 +260,24 @@ public class ExecutionJobVertex } } + protected ExecutionVertex createExecutionVertex( + ExecutionJobVertex jobVertex, + int subTaskIndex, + IntermediateResult[] producedDataSets, + Time timeout, + long createTimestamp, + int executionHistorySizeLimit, + int initialAttemptCount) { + return new ExecutionVertex( + jobVertex, + subTaskIndex, + producedDataSets, + timeout, + createTimestamp, + executionHistorySizeLimit, + initialAttemptCount); + } + public boolean isInitialized() { return taskVertices != null; } @@ -596,4 +614,15 @@ public class ExecutionJobVertex return ExecutionState.CREATED; } } + + /** Factory to create {@link ExecutionJobVertex}. */ + public static class Factory { + ExecutionJobVertex createExecutionJobVertex( + InternalExecutionGraphAccessor graph, + JobVertex jobVertex, + VertexParallelismInformation parallelismInfo) + throws JobException { + return new ExecutionJobVertex(graph, jobVertex, parallelismInfo); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 5da5bf7c8b6..abda5427f4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -49,6 +49,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; +import static org.apache.flink.util.Preconditions.checkState; /** * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several @@ -61,7 +62,7 @@ public class ExecutionVertex // -------------------------------------------------------------------------------------------- - private final ExecutionJobVertex jobVertex; + final ExecutionJobVertex jobVertex; private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions; @@ -69,7 +70,7 @@ public class ExecutionVertex private final ExecutionVertexID executionVertexId; - private final ExecutionHistory executionHistory; + final ExecutionHistory executionHistory; private final Time timeout; @@ -77,9 +78,11 @@ public class ExecutionVertex private final String taskNameWithSubtask; /** The current or latest execution attempt of this vertex's task. */ - private Execution currentExecution; // this field must never be null + Execution currentExecution; // this field must never be null - private final ArrayList<InputSplit> inputSplits; + final ArrayList<InputSplit> inputSplits; + + private int nextAttemptNumber; /** This field holds the allocation id of the last successful assignment. */ @Nullable private TaskManagerLocation lastAssignedLocation; @@ -134,24 +137,33 @@ public class ExecutionVertex this.executionHistory = new ExecutionHistory(executionHistorySizeLimit); - this.currentExecution = - new Execution( - getExecutionGraphAccessor().getFutureExecutor(), - this, - initialAttemptCount, - createTimestamp, - timeout); - - getExecutionGraphAccessor().registerExecution(currentExecution); + this.nextAttemptNumber = initialAttemptCount; this.timeout = timeout; this.inputSplits = new ArrayList<>(); + + this.currentExecution = createNewExecution(createTimestamp); + + getExecutionGraphAccessor().registerExecution(currentExecution); } // -------------------------------------------------------------------------------------------- // Properties // -------------------------------------------------------------------------------------------- + Execution createNewExecution(final long timestamp) { + return new Execution( + getExecutionGraphAccessor().getFutureExecutor(), + this, + nextAttemptNumber++, + timestamp, + timeout); + } + + public Execution getPartitionProducer() { + return currentExecution; + } + public JobID getJobId() { return this.jobVertex.getJobId(); } @@ -248,30 +260,30 @@ public class ExecutionVertex @Override public ExecutionState getExecutionState() { - return currentExecution.getState(); + return getCurrentExecutionAttempt().getState(); } @Override public long getStateTimestamp(ExecutionState state) { - return currentExecution.getStateTimestamp(state); + return getCurrentExecutionAttempt().getStateTimestamp(state); } @Override public Optional<ErrorInfo> getFailureInfo() { - return currentExecution.getFailureInfo(); + return getCurrentExecutionAttempt().getFailureInfo(); } public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() { - return currentExecution.getTaskManagerLocationFuture(); + return getCurrentExecutionAttempt().getTaskManagerLocationFuture(); } public LogicalSlot getCurrentAssignedResource() { - return currentExecution.getAssignedResource(); + return getCurrentExecutionAttempt().getAssignedResource(); } @Override public TaskManagerLocation getCurrentAssignedResourceLocation() { - return currentExecution.getAssignedResourceLocation(); + return getCurrentExecutionAttempt().getAssignedResourceLocation(); } @Override @@ -341,57 +353,56 @@ public class ExecutionVertex } private void resetForNewExecutionInternal(final long timestamp) { - final Execution oldExecution = currentExecution; - final ExecutionState oldState = oldExecution.getState(); - - if (oldState.isTerminal()) { - if (oldState == FINISHED) { - // pipelined partitions are released in Execution#cancel(), covering both job - // failures and vertex resets - // do not release pipelined partitions here to save RPC calls - oldExecution.handlePartitionCleanup(false, true); - getExecutionGraphAccessor() - .getPartitionGroupReleaseStrategy() - .vertexUnfinished(executionVertexId); - } + final boolean isFinished = (getExecutionState() == FINISHED); - executionHistory.add(oldExecution.archive()); + resetExecutionsInternal(); - final Execution newExecution = - new Execution( - getExecutionGraphAccessor().getFutureExecutor(), - this, - oldExecution.getAttemptNumber() + 1, - timestamp, - timeout); + InputSplitAssigner assigner = jobVertex.getSplitAssigner(); + if (assigner != null) { + assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex()); + inputSplits.clear(); + } - currentExecution = newExecution; + // if the execution was 'FINISHED' before, tell the ExecutionGraph that + // we take one step back on the road to reaching global FINISHED + if (isFinished) { + getJobVertex().executionVertexUnFinished(); + } - synchronized (inputSplits) { - InputSplitAssigner assigner = jobVertex.getSplitAssigner(); - if (assigner != null) { - assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex()); - inputSplits.clear(); - } - } + // reset the intermediate results + for (IntermediateResultPartition resultPartition : resultPartitions.values()) { + resultPartition.resetForNewExecution(); + } - // register this execution at the execution graph, to receive call backs - getExecutionGraphAccessor().registerExecution(newExecution); + final Execution newExecution = createNewExecution(timestamp); + currentExecution = newExecution; - // if the execution was 'FINISHED' before, tell the ExecutionGraph that - // we take one step back on the road to reaching global FINISHED - if (oldState == FINISHED) { - getJobVertex().executionVertexUnFinished(); - } + // register this execution to the execution graph, to receive call backs + getExecutionGraphAccessor().registerExecution(newExecution); + } - // reset the intermediate results - for (IntermediateResultPartition resultPartition : resultPartitions.values()) { - resultPartition.resetForNewExecution(); - } - } else { - throw new IllegalStateException( - "Cannot reset a vertex that is in non-terminal state " + oldState); + void resetExecutionsInternal() { + resetExecution(currentExecution); + } + + void resetExecution(final Execution execution) { + final ExecutionState oldState = execution.getState(); + + checkState( + oldState.isTerminal(), + "Cannot reset an execution that is in non-terminal state " + oldState); + + if (oldState == FINISHED) { + // pipelined partitions are released in Execution#cancel(), covering both job + // failures and vertex resets + // do not release pipelined partitions here to save RPC calls + execution.handlePartitionCleanup(false, true); + getExecutionGraphAccessor() + .getPartitionGroupReleaseStrategy() + .vertexUnfinished(executionVertexId); } + + executionHistory.add(execution.archive()); } public void tryAssignResource(LogicalSlot slot) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java new file mode 100644 index 00000000000..76118ef5544 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; + +/** The ExecutionJobVertex which supports speculative execution. */ +public class SpeculativeExecutionJobVertex extends ExecutionJobVertex { + + public SpeculativeExecutionJobVertex( + InternalExecutionGraphAccessor graph, + JobVertex jobVertex, + VertexParallelismInformation parallelismInfo) + throws JobException { + super(graph, jobVertex, parallelismInfo); + } + + @Override + protected ExecutionVertex createExecutionVertex( + ExecutionJobVertex jobVertex, + int subTaskIndex, + IntermediateResult[] producedDataSets, + Time timeout, + long createTimestamp, + int executionHistorySizeLimit, + int initialAttemptCount) { + return new SpeculativeExecutionVertex( + jobVertex, + subTaskIndex, + producedDataSets, + timeout, + createTimestamp, + executionHistorySizeLimit, + initialAttemptCount); + } + + /** Factory to create {@link SpeculativeExecutionJobVertex}. */ + public static class Factory extends ExecutionJobVertex.Factory { + @Override + ExecutionJobVertex createExecutionJobVertex( + InternalExecutionGraphAccessor graph, + JobVertex jobVertex, + VertexParallelismInformation parallelismInfo) + throws JobException { + return new SpeculativeExecutionJobVertex(graph, jobVertex, parallelismInfo); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java new file mode 100644 index 00000000000..f819c4539b7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.execution.ExecutionState.FAILED; +import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** The ExecutionVertex which supports speculative execution. */ +public class SpeculativeExecutionVertex extends ExecutionVertex { + + private final Map<ExecutionAttemptID, Execution> currentExecutions; + + public SpeculativeExecutionVertex( + ExecutionJobVertex jobVertex, + int subTaskIndex, + IntermediateResult[] producedDataSets, + Time timeout, + long createTimestamp, + int executionHistorySizeLimit, + int initialAttemptCount) { + super( + jobVertex, + subTaskIndex, + producedDataSets, + timeout, + createTimestamp, + executionHistorySizeLimit, + initialAttemptCount); + + this.currentExecutions = new LinkedHashMap<>(); + this.currentExecutions.put(currentExecution.getAttemptId(), currentExecution); + } + + public boolean containsSources() { + return getJobVertex().getJobVertex().containsSources(); + } + + public boolean containsSinks() { + return getJobVertex().getJobVertex().containsSinks(); + } + + public Execution createNewSpeculativeExecution(final long timestamp) { + final Execution newExecution = createNewExecution(timestamp); + getExecutionGraphAccessor().registerExecution(newExecution); + currentExecutions.put(newExecution.getAttemptId(), newExecution); + return newExecution; + } + + @Override + public Collection<Execution> getCurrentExecutions() { + return Collections.unmodifiableCollection(currentExecutions.values()); + } + + @Override + public Execution getPartitionProducer() { + final Execution finishedExecution = getCurrentExecutionAttempt(); + checkState( + finishedExecution.getState() == FINISHED, + "It's not allowed to get the partition producer of an un-finished SpeculativeExecutionVertex"); + return finishedExecution; + } + + @Override + public CompletableFuture<?> cancel() { + final List<CompletableFuture<?>> cancelResultFutures = + new ArrayList<>(currentExecutions.size()); + for (Execution execution : currentExecutions.values()) { + execution.cancel(); + cancelResultFutures.add(execution.getReleaseFuture()); + } + return FutureUtils.combineAll(cancelResultFutures); + } + + @Override + public CompletableFuture<?> suspend() { + return FutureUtils.combineAll( + currentExecutions.values().stream() + .map(Execution::suspend) + .collect(Collectors.toList())); + } + + @Override + public void fail(Throwable t) { + currentExecutions.values().forEach(e -> e.fail(t)); + } + + @Override + public void markFailed(Throwable t) { + currentExecutions.values().forEach(e -> e.markFailed(t)); + } + + @Override + public void resetForNewExecution() { + super.resetForNewExecution(); + + currentExecutions.clear(); + currentExecutions.put(currentExecution.getAttemptId(), currentExecution); + } + + @Override + void resetExecutionsInternal() { + for (Execution execution : currentExecutions.values()) { + resetExecution(execution); + } + } + + /** + * Remove execution from currentExecutions if it is failed. It is needed to make room for + * possible future speculative executions. + * + * @param executionAttemptId attemptID of the execution to be removed + */ + public void archiveFailedExecution(ExecutionAttemptID executionAttemptId) { + if (this.currentExecutions.size() <= 1) { + // Leave the last execution because currentExecutions should never be empty. This should + // happen only if all current executions have FAILED. A vertex reset will happen soon + // and will archive the remaining execution. + return; + } + + final Execution removedExecution = this.currentExecutions.remove(executionAttemptId); + checkNotNull( + removedExecution, + "Cannot remove execution %s which does not exist.", + executionAttemptId); + checkState( + removedExecution.getState() == FAILED, + "Cannot remove execution %s which is not FAILED.", + executionAttemptId); + + executionHistory.add(removedExecution.archive()); + if (removedExecution == this.currentExecution) { + this.currentExecution = this.currentExecutions.values().iterator().next(); + } + } + + @Override + public Execution getCurrentExecutionAttempt() { + // returns the execution which is most likely to reach FINISHED state + Execution currentExecution = this.currentExecution; + for (Execution execution : currentExecutions.values()) { + if (getStatePriority(execution.getState()) + < getStatePriority(currentExecution.getState())) { + currentExecution = execution; + } + } + return currentExecution; + } + + private int getStatePriority(ExecutionState state) { + // the more likely to reach FINISHED state, the higher priority, the smaller value + switch (state) { + // CREATED/SCHEDULED/INITIALIZING/RUNNING/FINISHED are healthy states with an + // increasing priority + case FINISHED: + return 0; + case RUNNING: + return 1; + case INITIALIZING: + return 2; + case DEPLOYING: + return 3; + case SCHEDULED: + return 4; + case CREATED: + return 5; + // if the vertex is not in a healthy state, shows its CANCELING state unless it is + // fully FAILED or CANCELED + case CANCELING: + return 6; + case FAILED: + return 7; + case CANCELED: + return 8; + default: + throw new IllegalStateException("Execution state " + state + " is not supported."); + } + } + + @Override + void notifyPendingDeployment(Execution execution) { + getExecutionGraphAccessor() + .getExecutionDeploymentListener() + .onStartedDeployment( + execution.getAttemptId(), + execution.getAssignedResourceLocation().getResourceID()); + } + + @Override + void notifyCompletedDeployment(Execution execution) { + getExecutionGraphAccessor() + .getExecutionDeploymentListener() + .onCompletedDeployment(execution.getAttemptId()); + } + + @Override + void notifyStateTransition( + Execution execution, ExecutionState previousState, ExecutionState newState) { + getExecutionGraphAccessor().notifyExecutionChange(execution, previousState, newState); + } + + @Override + public ArchivedSpeculativeExecutionVertex archive() { + return new ArchivedSpeculativeExecutionVertex(this); + } + + @Override + void cachePartitionInfo(PartitionInfo partitionInfo) { + throw new UnsupportedOperationException( + "Method is not supported in SpeculativeExecutionVertex."); + } + + @Override + public void tryAssignResource(LogicalSlot slot) { + throw new UnsupportedOperationException( + "Method is not supported in SpeculativeExecutionVertex."); + } + + @Override + public void deploy() { + throw new UnsupportedOperationException( + "Method is not supported in SpeculativeExecutionVertex."); + } + + @Override + public void deployToSlot(LogicalSlot slot) { + throw new UnsupportedOperationException( + "Method is not supported in SpeculativeExecutionVertex."); + } + + @Override + public Optional<TaskManagerLocation> getPreferredLocationBasedOnState() { + throw new UnsupportedOperationException( + "Method is not supported in SpeculativeExecutionVertex."); + } + + @Override + public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() { + throw new UnsupportedOperationException( + "Method is not supported in SpeculativeExecutionVertex."); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index 192242fe952..6d3bcb54953 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener; import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; @@ -49,6 +50,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Supplier; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** Default {@link ExecutionGraphFactory} implementation. */ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { @@ -64,6 +67,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { private final JobMasterPartitionTracker jobMasterPartitionTracker; private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory; private final boolean isDynamicGraph; + private final ExecutionJobVertex.Factory executionJobVertexFactory; public DefaultExecutionGraphFactory( Configuration configuration, @@ -87,7 +91,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { blobWriter, shuffleMaster, jobMasterPartitionTracker, - false); + false, + new ExecutionJobVertex.Factory()); } public DefaultExecutionGraphFactory( @@ -101,7 +106,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { BlobWriter blobWriter, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker jobMasterPartitionTracker, - boolean isDynamicGraph) { + boolean isDynamicGraph, + ExecutionJobVertex.Factory executionJobVertexFactory) { this.configuration = configuration; this.userCodeClassLoader = userCodeClassLoader; this.executionDeploymentTracker = executionDeploymentTracker; @@ -120,6 +126,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { WebOptions.CHECKPOINTS_HISTORY_SIZE), jobManagerJobMetricGroup)); this.isDynamicGraph = isDynamicGraph; + this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory); } @Override @@ -167,7 +174,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { vertexAttemptNumberStore, vertexParallelismStore, checkpointStatsTrackerFactory, - isDynamicGraph); + isDynamicGraph, + executionJobVertexFactory); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java index 0cc501203b8..3aaad1dbd0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; @@ -145,7 +146,8 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory { blobWriter, shuffleMaster, partitionTracker, - true); + true, + new ExecutionJobVertex.Factory()); return new AdaptiveBatchScheduler( log, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java new file mode 100644 index 00000000000..dff88244fee --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link SpeculativeExecutionVertex}. */ +class SpeculativeExecutionVertexTest { + + @RegisterExtension + private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + private TestingInternalFailuresListener internalFailuresListener; + + @BeforeEach + void setUp() { + internalFailuresListener = new TestingInternalFailuresListener(); + } + + @Test + void testCreateSpeculativeExecution() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + assertThat(ev.getCurrentExecutions()).hasSize(1); + + ev.createNewSpeculativeExecution(System.currentTimeMillis()); + assertThat(ev.getCurrentExecutions()).hasSize(2); + } + + @Test + void testResetExecutionVertex() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + final Execution e1 = ev.getCurrentExecutionAttempt(); + final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis()); + + e1.transitionState(ExecutionState.RUNNING); + e1.markFinished(); + e2.cancel(); + ev.resetForNewExecution(); + + assertThat(ev.getExecutionHistory().getHistoricalExecution(0).get().getAttemptId()) + .isEqualTo(e1.getAttemptId()); + assertThat(ev.getExecutionHistory().getHistoricalExecution(1).get().getAttemptId()) + .isEqualTo(e2.getAttemptId()); + assertThat(ev.getCurrentExecutions()).hasSize(1); + assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2); + } + + @Test + void testCancel() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + final Execution e1 = ev.getCurrentExecutionAttempt(); + final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis()); + + ev.cancel(); + assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED); + assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED); + } + + @Test + void testSuspend() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + final Execution e1 = ev.getCurrentExecutionAttempt(); + final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis()); + + ev.suspend(); + assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED); + assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED); + } + + @Test + void testFail() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + final Execution e1 = ev.getCurrentExecutionAttempt(); + final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis()); + + ev.fail(new Exception("Forced test failure.")); + assertThat(internalFailuresListener.getFailedTasks()) + .containsExactly(e1.getAttemptId(), e2.getAttemptId()); + } + + @Test + void testMarkFailed() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + final Execution e1 = ev.getCurrentExecutionAttempt(); + final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis()); + + ev.markFailed(new Exception("Forced test failure.")); + assertThat(internalFailuresListener.getFailedTasks()) + .containsExactly(e1.getAttemptId(), e2.getAttemptId()); + } + + @Test + void testArchiveFailedExecutions() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + + final Execution e1 = ev.getCurrentExecutionAttempt(); + e1.transitionState(ExecutionState.RUNNING); + + final Execution e2 = ev.createNewSpeculativeExecution(0); + e2.transitionState(ExecutionState.FAILED); + + ev.archiveFailedExecution(e2.getAttemptId()); + assertThat(ev.getCurrentExecutions()).hasSize(1); + assertThat(ev.currentExecution).isSameAs(e1); + + final Execution e3 = ev.createNewSpeculativeExecution(0); + e3.transitionState(ExecutionState.RUNNING); + e1.transitionState(ExecutionState.FAILED); + + ev.archiveFailedExecution(e1.getAttemptId()); + assertThat(ev.getCurrentExecutions()).hasSize(1); + assertThat(ev.currentExecution).isSameAs(e3); + } + + @Test + void testArchiveTheOnlyCurrentExecution() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + + final Execution e1 = ev.getCurrentExecutionAttempt(); + e1.transitionState(ExecutionState.FAILED); + + ev.archiveFailedExecution(e1.getAttemptId()); + + assertThat(ev.getCurrentExecutions()).hasSize(1); + assertThat(ev.currentExecution).isSameAs(e1); + } + + @Test + void testArchiveNonFailedExecutionWithArchiveFailedExecutionMethod() { + Assertions.assertThrows( + IllegalStateException.class, + () -> { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + + final Execution e1 = ev.getCurrentExecutionAttempt(); + e1.transitionState(ExecutionState.FAILED); + + final Execution e2 = ev.createNewSpeculativeExecution(0); + e2.transitionState(ExecutionState.RUNNING); + + ev.archiveFailedExecution(e2.getAttemptId()); + }); + } + + @Test + void testGetExecutionState() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + + final Execution e1 = ev.getCurrentExecutionAttempt(); + e1.transitionState(ExecutionState.CANCELED); + assertThat(ev.getExecutionState()).isSameAs(ExecutionState.CANCELED); + + // the latter added state is more likely to reach FINISH state + final List<ExecutionState> statesSortedByPriority = new ArrayList<>(); + statesSortedByPriority.add(ExecutionState.FAILED); + statesSortedByPriority.add(ExecutionState.CANCELING); + statesSortedByPriority.add(ExecutionState.CREATED); + statesSortedByPriority.add(ExecutionState.SCHEDULED); + statesSortedByPriority.add(ExecutionState.DEPLOYING); + statesSortedByPriority.add(ExecutionState.INITIALIZING); + statesSortedByPriority.add(ExecutionState.RUNNING); + statesSortedByPriority.add(ExecutionState.FINISHED); + + for (ExecutionState state : statesSortedByPriority) { + final Execution execution = ev.createNewSpeculativeExecution(0); + execution.transitionState(state); + assertThat(ev.getExecutionState()).isSameAs(state); + } + } + + private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception { + final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex); + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); + return (SpeculativeExecutionVertex) + executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0]; + } + + private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { + final ExecutionGraph executionGraph = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(jobGraph) + .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory()) + .build(EXECUTOR_RESOURCE.getExecutor()); + + executionGraph.setInternalTaskFailuresListener(internalFailuresListener); + executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + return executionGraph; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index 8e5e57ddea0..62933dcee34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -73,6 +73,7 @@ public class TestingDefaultExecutionGraphBuilder { private ExecutionStateUpdateListener executionStateUpdateListener = (execution, previousState, newState) -> {}; private VertexParallelismStore vertexParallelismStore; + private ExecutionJobVertex.Factory executionJobVertexFactory = new ExecutionJobVertex.Factory(); private TestingDefaultExecutionGraphBuilder() {} @@ -142,6 +143,12 @@ public class TestingDefaultExecutionGraphBuilder { return this; } + public TestingDefaultExecutionGraphBuilder setExecutionJobVertexFactory( + ExecutionJobVertex.Factory executionJobVertexFactory) { + this.executionJobVertexFactory = executionJobVertexFactory; + return this; + } + private DefaultExecutionGraph build( boolean isDynamicGraph, ScheduledExecutorService executorService) throws JobException, JobExecutionException { @@ -168,7 +175,8 @@ public class TestingDefaultExecutionGraphBuilder { Optional.ofNullable(vertexParallelismStore) .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)), () -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup()), - isDynamicGraph); + isDynamicGraph, + executionJobVertexFactory); } public DefaultExecutionGraph build(ScheduledExecutorService executorService) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java index 96c89093b92..3525fea6eae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java @@ -333,18 +333,7 @@ class DefaultExecutionDeployerTest { .setPartitionTracker(partitionTracker) .build(executor); - executionGraph.setInternalTaskFailuresListener( - new InternalFailuresListener() { - @Override - public void notifyTaskFailure( - ExecutionAttemptID attemptId, - Throwable t, - boolean cancelTask, - boolean releasePartitions) {} - - @Override - public void notifyGlobalFailure(Throwable t) {} - }); + executionGraph.setInternalTaskFailuresListener(new TestingInternalFailuresListener()); executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); return executionGraph; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java new file mode 100644 index 00000000000..56a257243ee --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** An {@link InternalFailuresListener} implementation for testing purpose. */ +public class TestingInternalFailuresListener implements InternalFailuresListener { + + private final List<ExecutionAttemptID> failedTasks = new ArrayList<>(); + + @Override + public void notifyTaskFailure( + ExecutionAttemptID attemptId, + Throwable t, + boolean cancelTask, + boolean releasePartitions) { + failedTasks.add(attemptId); + } + + @Override + public void notifyGlobalFailure(Throwable t) {} + + public List<ExecutionAttemptID> getFailedTasks() { + return Collections.unmodifiableList(failedTasks); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java index 12740adb7cf..d462c726bc4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptivebatch; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory; @@ -72,7 +73,8 @@ public class AdaptiveBatchSchedulerTestUtils { blobWriter, shuffleMaster, partitionTracker, - true); + true, + new ExecutionJobVertex.Factory()); return new AdaptiveBatchScheduler( log,
