This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f4105f65fb510de11ccb08c4b31d5e4c30744151 Author: Matthias Pohl <[email protected]> AuthorDate: Thu Apr 15 10:12:25 2021 +0200 [FLINK-22276][runtime] Fixes the concurrency issue This commit fixes an issue where multiple failures can occur close to each other. In that case, the DefaultScheduler's restart logic competes for each of these failures. If multiple failures refer to the same Execution, it might be that the restart due to one failure handling cleans up the failure already. This leads to an IllegalArgumentException when archiving the next failure refering to the same Execution. The issue was that the code relied on ExecutionVertices instead of Executions. The new implementation relies on the Executions that were present when the failure was handled. Therefore, FailureHandlingResultSnapshot is introduced. It extracts the Execution information from the ExecutionGraph. Additionally, instead of accessing on ExecutionVertex.getTaskNameWithSubtaskIndex() to collect the task name, the new implementation relies on Execution.getVertexWithAttempt(). This enables us to solely rely on the Execution without an extra dependency on the ExecutionVertex. The new implementation also removes the add method from RootExceptionHistoryEntry. This makes the instantiation cleaner. ExceptionHistoryEntryExtractor was replaced by the factory methods RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot and RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot as part of this effort. This closes #15640. --- .../flink/runtime/executiongraph/ErrorInfo.java | 20 +- .../flink/runtime/scheduler/DefaultScheduler.java | 8 +- .../flink/runtime/scheduler/SchedulerBase.java | 43 ++-- .../exceptionhistory/ExceptionHistoryEntry.java | 38 ++- .../ExceptionHistoryEntryExtractor.java | 157 ------------ .../FailureHandlingResultSnapshot.java | 152 ++++++++++++ .../RootExceptionHistoryEntry.java | 97 +++++--- .../rest/handler/job/JobExceptionsHandlerTest.java | 28 ++- .../runtime/scheduler/DefaultSchedulerTest.java | 15 +- .../ExceptionHistoryEntryTest.java | 149 +++++++++++ .../FailureHandlingResultSnapshotTest.java | 276 +++++++++++++++++++++ ...est.java => RootExceptionHistoryEntryTest.java} | 105 ++------ 12 files changed, 773 insertions(+), 315 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java index cfbfc6f..0663ea1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java @@ -49,12 +49,20 @@ public class ErrorInfo implements Serializable { */ public static ErrorInfo createErrorInfoWithNullableCause( @Nullable Throwable exception, long timestamp) { - return new ErrorInfo( - exception != null - ? exception - : new FlinkException( - "Unknown cause for Execution failure (this might be caused by FLINK-21376)."), - timestamp); + return new ErrorInfo(handleMissingThrowable(exception), timestamp); + } + + /** + * Utility method to cover FLINK-21376. + * + * @param throwable The actual exception. + * @return a {@link FlinkException} if no exception was passed. + */ + public static Throwable handleMissingThrowable(@Nullable Throwable throwable) { + return throwable != null + ? throwable + : new FlinkException( + "Unknown cause for Execution failure (this might be caused by FLINK-21376)."); } public ErrorInfo(@Nonnull Throwable exception, long timestamp) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 33e8636..c9d77de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder; +import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; @@ -251,12 +252,17 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart); + final FailureHandlingResultSnapshot failureHandlingResultSnapshot = + FailureHandlingResultSnapshot.create( + failureHandlingResult, + id -> this.getExecutionVertex(id).getCurrentExecutionAttempt()); delayExecutor.schedule( () -> FutureUtils.assertNoException( cancelFuture.thenRunAsync( () -> { - archiveFromFailureHandlingResult(failureHandlingResult); + archiveFromFailureHandlingResult( + failureHandlingResultSnapshot); restartTasks(executionVertexVersions, globalRecovery); }, getMainThreadExecutor())), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 0ab5672..c675e66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -52,7 +52,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; -import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -75,7 +74,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; -import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryExtractor; +import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager; @@ -148,7 +147,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling private final ComponentMainThreadExecutor mainThreadExecutor; - private final ExceptionHistoryEntryExtractor exceptionHistoryEntryExtractor; private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory; private final ExecutionGraphFactory executionGraphFactory; @@ -215,7 +213,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling new DefaultOperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure); operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor); - this.exceptionHistoryEntryExtractor = new ExceptionHistoryEntryExtractor(); this.exceptionHistory = new BoundedFIFOQueue<>( jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE)); @@ -629,38 +626,42 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling return executionGraph.getTerminationFuture(); } - protected final void archiveGlobalFailure(@Nullable Throwable failure) { - archiveGlobalFailure(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED)); + protected final void archiveGlobalFailure(Throwable failure) { + archiveGlobalFailure( + failure, + executionGraph.getStatusTimestamp(JobStatus.FAILED), + StreamSupport.stream(executionGraph.getAllExecutionVertices().spliterator(), false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .collect(Collectors.toSet())); } - private void archiveGlobalFailure(@Nullable Throwable failure, long timestamp) { + private void archiveGlobalFailure( + Throwable failure, long timestamp, Iterable<Execution> executions) { exceptionHistory.add( - exceptionHistoryEntryExtractor.extractGlobalFailure( - executionGraph.getAllExecutionVertices(), failure, timestamp)); + RootExceptionHistoryEntry.fromGlobalFailure(failure, timestamp, executions)); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( - FailureHandlingResult failureHandlingResult) { - if (failureHandlingResult.getExecutionVertexIdOfFailedTask().isPresent()) { - final ExecutionVertexID executionVertexId = - failureHandlingResult.getExecutionVertexIdOfFailedTask().get(); + FailureHandlingResultSnapshot failureHandlingResult) { + if (failureHandlingResult.getRootCauseExecution().isPresent()) { + final Execution rootCauseExecution = + failureHandlingResult.getRootCauseExecution().get(); + final RootExceptionHistoryEntry rootEntry = - exceptionHistoryEntryExtractor.extractLocalFailure( - executionGraph.getAllVertices(), - executionVertexId, - failureHandlingResult.getVerticesToRestart().stream() - .filter(v -> !executionVertexId.equals(v)) - .collect(Collectors.toSet())); + RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot( + failureHandlingResult); exceptionHistory.add(rootEntry); log.debug( "Archive local failure causing attempt {} to fail: {}", - executionVertexId, + rootCauseExecution.getAttemptId(), rootEntry.getExceptionAsString()); } else { archiveGlobalFailure( - failureHandlingResult.getError(), failureHandlingResult.getTimestamp()); + failureHandlingResult.getRootCause(), + failureHandlingResult.getTimestamp(), + failureHandlingResult.getConcurrentlyFailedExecution()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java index 08a69ef..40b782d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java @@ -20,8 +20,11 @@ package org.apache.flink.runtime.scheduler.exceptionhistory; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -39,7 +42,40 @@ public class ExceptionHistoryEntry extends ErrorInfo { @Nullable private final String failingTaskName; @Nullable private final ArchivedTaskManagerLocation taskManagerLocation; - ExceptionHistoryEntry( + /** + * Creates an {@code ExceptionHistoryEntry} based on the provided {@code Execution}. + * + * @param failedExecution the failed {@code Execution}. + * @param taskName the name of the task. + * @return The {@code ExceptionHistoryEntry}. + * @throws IllegalArgumentException if the passed {@code Execution} does not provide a {@link + * Execution#getFailureInfo() failureInfo}. + */ + public static ExceptionHistoryEntry create(AccessExecution failedExecution, String taskName) { + Preconditions.checkArgument( + failedExecution.getFailureInfo().isPresent(), + "The selected Execution " + failedExecution.getAttemptId() + " didn't fail."); + + final ErrorInfo failure = failedExecution.getFailureInfo().get(); + return new ExceptionHistoryEntry( + failure.getException(), + failure.getTimestamp(), + taskName, + failedExecution.getAssignedResourceLocation()); + } + + /** + * Instantiates a {@code ExceptionHistoryEntry}. + * + * @param cause The reason for the failure. + * @param timestamp The time the failure was caught. + * @param failingTaskName The name of the task that failed. + * @param taskManagerLocation The host the task was running on. + * @throws NullPointerException if {@code cause} is {@code null}. + * @throws IllegalArgumentException if the passed {@code timestamp} is not bigger than {@code + * 0}. + */ + protected ExceptionHistoryEntry( Throwable cause, long timestamp, @Nullable String failingTaskName, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java deleted file mode 100644 index 98d4dcd..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler.exceptionhistory; - -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedThrowable; -import org.apache.flink.util.function.QuadFunction; - -import java.util.Map; - -/** - * {@code ExceptionHistoryEntryExtractor} extracts all the necessary information from given - * executions to create corresponding {@link RootExceptionHistoryEntry RootExceptionHistoryEntries}. - */ -public class ExceptionHistoryEntryExtractor { - - /** - * Extracts a {@link RootExceptionHistoryEntry} based on the passed local failure information. - * - * @param executionJobVertices The {@link ExecutionJobVertex} instances registry. - * @param failedExecutionVertexId The {@link ExecutionVertexID} referring to the {@link - * ExecutionVertex} that is the root of the failure. - * @param otherAffectedVertices The {@code ExecutionVertexID}s of other affected {@code - * ExecutionVertices} that, if failed as well, would be added as concurrent failures. - * @return The {@code RootExceptionHistoryEntry}. - * @throws IllegalArgumentException if one of the passed {@code ExecutionVertexID}s cannot be - * resolved into an {@code ExecutionVertex}. - * @throws IllegalArgumentException if the {@code failedExecutionVertexID} refers to an {@code - * ExecutionVertex} that didn't fail. - */ - public RootExceptionHistoryEntry extractLocalFailure( - Map<JobVertexID, ExecutionJobVertex> executionJobVertices, - ExecutionVertexID failedExecutionVertexId, - Iterable<ExecutionVertexID> otherAffectedVertices) { - final ExecutionVertex rootCauseExecutionVertex = - getExecutionVertex(executionJobVertices, failedExecutionVertexId); - - if (rootCauseExecutionVertex.getFailureInfo().isPresent()) { - final RootExceptionHistoryEntry root = - createLocalExceptionHistoryEntry( - RootExceptionHistoryEntry::fromLocalFailure, rootCauseExecutionVertex); - - for (ExecutionVertexID otherExecutionVertexId : otherAffectedVertices) { - final ExecutionVertex executionVertex = - getExecutionVertex(executionJobVertices, otherExecutionVertexId); - if (executionVertex.getFailureInfo().isPresent()) { - root.add( - createLocalExceptionHistoryEntry( - ExceptionHistoryEntry::new, executionVertex)); - } - } - - return root; - } - - return RootExceptionHistoryEntry.fromGlobalFailure( - new FlinkException( - "This is a workaround for FLINK-22276: The actual failure was cleaned up already."), - System.currentTimeMillis()); - } - - /** - * Extracts a {@link RootExceptionHistoryEntry} based on the global failure information. - * - * @param executionVertices The {@link ExecutionVertex ExecutionVertices} that are affected by - * the global failure and, if failed as well, would be added as concurrent failures to the - * entry. - * @param rootCause The {@code Throwable} causing the failure. - * @param timestamp The timestamp the failure occurred. - * @return The {@code RootExceptionHistoryEntry}. - * @throws IllegalArgumentException if one of the passed {@code ExecutionVertexID}s cannot be * - * resolved into an {@code ExecutionVertex}. - */ - public RootExceptionHistoryEntry extractGlobalFailure( - Iterable<ExecutionVertex> executionVertices, Throwable rootCause, long timestamp) { - final RootExceptionHistoryEntry root = - RootExceptionHistoryEntry.fromGlobalFailure(rootCause, timestamp); - - for (ExecutionVertex executionVertex : executionVertices) { - if (!executionVertex.getFailureInfo().isPresent()) { - continue; - } - - final ExceptionHistoryEntry exceptionHistoryEntry = - createLocalExceptionHistoryEntry(ExceptionHistoryEntry::new, executionVertex); - if (exceptionHistoryEntry != null) { - root.add(exceptionHistoryEntry); - } - } - - return root; - } - - private static ExecutionVertex getExecutionVertex( - Map<JobVertexID, ExecutionJobVertex> executionJobVertices, - ExecutionVertexID executionVertexID) { - final ExecutionJobVertex executionJobVertex = - executionJobVertices.get(executionVertexID.getJobVertexId()); - - Preconditions.checkArgument( - executionJobVertex != null, - "The passed ExecutionVertexID does not correspond to any ExecutionJobVertex provided."); - final ExecutionVertex[] executionVertices = executionJobVertex.getTaskVertices(); - - Preconditions.checkArgument( - executionVertices.length > executionVertexID.getSubtaskIndex(), - "The ExecutionJobVertex referred by the passed ExecutionVertexID does not have the right amount of subtasks (expected subtask ID: {}; actual number of subtasks: {}).", - executionVertexID.getSubtaskIndex(), - executionJobVertex.getTaskVertices().length); - return executionVertices[executionVertexID.getSubtaskIndex()]; - } - - private static <E extends ExceptionHistoryEntry> E createLocalExceptionHistoryEntry( - QuadFunction<SerializedThrowable, Long, String, TaskManagerLocation, E> - exceptionHistoryEntryCreator, - ExecutionVertex executionVertex) { - return executionVertex - .getFailureInfo() - .map( - failureInfo -> - exceptionHistoryEntryCreator.apply( - failureInfo.getException(), - failureInfo.getTimestamp(), - executionVertex.getTaskNameWithSubtaskIndex(), - executionVertex - .getCurrentExecutionAttempt() - .getAssignedResourceLocation())) - .orElseThrow( - () -> - new IllegalArgumentException( - "The selected ExecutionVertex " - + executionVertex.getID() - + " didn't fail.")); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java new file mode 100644 index 0000000..7ec115a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link FailureHandlingResult} + * providing the actual {@link Execution Executions}. + */ +public class FailureHandlingResultSnapshot { + + @Nullable private final Execution rootCauseExecution; + private final Throwable rootCause; + private final long timestamp; + private final Set<Execution> concurrentlyFailedExecutions; + + /** + * Creates a {@code FailureHandlingResultSnapshot} based on the passed {@link + * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}. + * + * @param failureHandlingResult The {@code FailureHandlingResult} that is used for extracting + * the failure information. + * @param latestExecutionLookup The look-up function for retrieving the latest {@link Execution} + * instance for a given {@link ExecutionVertexID}. + * @return The {@code FailureHandlingResultSnapshot}. + */ + public static FailureHandlingResultSnapshot create( + FailureHandlingResult failureHandlingResult, + Function<ExecutionVertexID, Execution> latestExecutionLookup) { + final Execution rootCauseExecution = + failureHandlingResult + .getExecutionVertexIdOfFailedTask() + .map(latestExecutionLookup) + .orElse(null); + Preconditions.checkArgument( + rootCauseExecution == null || rootCauseExecution.getFailureInfo().isPresent(), + String.format( + "The execution %s didn't provide a failure info even though the corresponding ExecutionVertex %s is marked as having handled the root cause of this failure.", + // the "(null)" values should never be used due to the condition - it's just + // added to make the compiler happy + rootCauseExecution != null ? rootCauseExecution.getAttemptId() : "(null)", + failureHandlingResult + .getExecutionVertexIdOfFailedTask() + .map(Objects::toString) + .orElse("(null)"))); + + final ExecutionVertexID rootCauseExecutionVertexId = + failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null); + final Set<Execution> concurrentlyFailedExecutions = + failureHandlingResult.getVerticesToRestart().stream() + .filter( + executionVertexId -> + !executionVertexId.equals(rootCauseExecutionVertexId)) + .map(latestExecutionLookup) + .filter(execution -> execution.getFailureInfo().isPresent()) + .collect(Collectors.toSet()); + + return new FailureHandlingResultSnapshot( + rootCauseExecution, + ErrorInfo.handleMissingThrowable(failureHandlingResult.getError()), + failureHandlingResult.getTimestamp(), + concurrentlyFailedExecutions); + } + + @VisibleForTesting + FailureHandlingResultSnapshot( + @Nullable Execution rootCauseExecution, + Throwable rootCause, + long timestamp, + Set<Execution> concurrentlyFailedExecutions) { + Preconditions.checkArgument( + rootCauseExecution == null + || !concurrentlyFailedExecutions.contains(rootCauseExecution), + "The rootCauseExecution should not be part of the concurrentlyFailedExecutions map."); + + this.rootCauseExecution = rootCauseExecution; + this.rootCause = Preconditions.checkNotNull(rootCause); + this.timestamp = timestamp; + this.concurrentlyFailedExecutions = + Preconditions.checkNotNull(concurrentlyFailedExecutions); + } + + /** + * Returns the {@link Execution} that handled the root cause for this failure. An empty {@code + * Optional} will be returned if it's a global failure. + * + * @return The {@link Execution} that handled the root cause for this failure. + */ + public Optional<Execution> getRootCauseExecution() { + return Optional.ofNullable(rootCauseExecution); + } + + /** + * The actual failure that is handled. + * + * @return The {@code Throwable}. + */ + public Throwable getRootCause() { + return rootCause; + } + + /** + * The time the failure occurred. + * + * @return The time of the failure. + */ + public long getTimestamp() { + return timestamp; + } + + /** + * All {@link Execution Executions} that failed and are planned to be restarted as part of this + * failure handling. + * + * @return The concurrently failed {@code Executions}. + */ + public Iterable<Execution> getConcurrentlyFailedExecution() { + return Collections.unmodifiableSet(concurrentlyFailedExecutions); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java index 400e3d3..f84b6fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java @@ -19,14 +19,14 @@ package org.apache.flink.runtime.scheduler.exceptionhistory; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.Collection; - -import static org.apache.flink.util.Preconditions.checkNotNull; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * {@code RootExceptionHistoryEntry} extending {@link ExceptionHistoryEntry} by providing a list of @@ -36,57 +36,94 @@ public class RootExceptionHistoryEntry extends ExceptionHistoryEntry { private static final long serialVersionUID = -7647332765867297434L; - private final Collection<ExceptionHistoryEntry> concurrentExceptions = new ArrayList<>(); + private final Set<ExceptionHistoryEntry> concurrentExceptions; + + /** + * Creates a {@code RootExceptionHistoryEntry} based on the passed {@link + * FailureHandlingResultSnapshot}. + * + * @param snapshot The reason for the failure. + * @return The {@code RootExceptionHistoryEntry} instance. + * @throws NullPointerException if {@code cause} or {@code failingTaskName} are {@code null}. + * @throws IllegalArgumentException if the {@code timestamp} of the passed {@code + * FailureHandlingResult} is not bigger than {@code 0}. + */ + public static RootExceptionHistoryEntry fromFailureHandlingResultSnapshot( + FailureHandlingResultSnapshot snapshot) { + String failingTaskName = null; + TaskManagerLocation taskManagerLocation = null; + if (snapshot.getRootCauseExecution().isPresent()) { + final Execution rootCauseExecution = snapshot.getRootCauseExecution().get(); + failingTaskName = rootCauseExecution.getVertexWithAttempt(); + taskManagerLocation = rootCauseExecution.getAssignedResourceLocation(); + } + + return createRootExceptionHistoryEntry( + snapshot.getRootCause(), + snapshot.getTimestamp(), + failingTaskName, + taskManagerLocation, + snapshot.getConcurrentlyFailedExecution()); + } /** * Creates a {@code RootExceptionHistoryEntry} representing a global failure from the passed - * {@code Throwable} and timestamp. + * {@code Throwable} and timestamp. If any of the passed {@link Execution Executions} failed, it + * will be added to the {@code RootExceptionHistoryEntry}'s concurrently caught failures. * * @param cause The reason for the failure. * @param timestamp The time the failure was caught. + * @param executions The {@link Execution} instances that shall be analyzed for failures. * @return The {@code RootExceptionHistoryEntry} instance. - * @throws NullPointerException if {@code cause} is {@code null}. + * @throws NullPointerException if {@code failure} is {@code null}. * @throws IllegalArgumentException if the passed {@code timestamp} is not bigger than {@code * 0}. */ - @VisibleForTesting - public static RootExceptionHistoryEntry fromGlobalFailure(Throwable cause, long timestamp) { - return new RootExceptionHistoryEntry(cause, timestamp, null, null); + public static RootExceptionHistoryEntry fromGlobalFailure( + Throwable cause, long timestamp, Iterable<Execution> executions) { + return createRootExceptionHistoryEntry(cause, timestamp, null, null, executions); + } + + private static RootExceptionHistoryEntry createRootExceptionHistoryEntry( + Throwable cause, + long timestamp, + @Nullable String failingTaskName, + @Nullable TaskManagerLocation taskManagerLocation, + Iterable<Execution> executions) { + return new RootExceptionHistoryEntry( + cause, + timestamp, + failingTaskName, + taskManagerLocation, + StreamSupport.stream(executions.spliterator(), false) + .filter(execution -> execution.getFailureInfo().isPresent()) + .map( + execution -> + ExceptionHistoryEntry.create( + execution, execution.getVertexWithAttempt())) + .collect(Collectors.toSet())); } /** - * Creates a {@code RootExceptionHistoryEntry} representing a local failure using the passed - * information. + * Instantiates a {@code RootExceptionHistoryEntry}. * * @param cause The reason for the failure. * @param timestamp The time the failure was caught. * @param failingTaskName The name of the task that failed. - * @param taskManagerLocation The {@link TaskManagerLocation} the task was running on. - * @return The {@code RootExceptionHistoryEntry} instance. - * @throws NullPointerException if {@code cause} or {@code failingTaskName} are {@code null}. + * @param taskManagerLocation The host the task was running on. + * @throws NullPointerException if {@code cause} is {@code null}. * @throws IllegalArgumentException if the passed {@code timestamp} is not bigger than {@code * 0}. */ @VisibleForTesting - public static RootExceptionHistoryEntry fromLocalFailure( - Throwable cause, - long timestamp, - String failingTaskName, - @Nullable TaskManagerLocation taskManagerLocation) { - return new RootExceptionHistoryEntry( - cause, timestamp, checkNotNull(failingTaskName), taskManagerLocation); - } - - private RootExceptionHistoryEntry( + public RootExceptionHistoryEntry( Throwable cause, long timestamp, @Nullable String failingTaskName, - @Nullable TaskManagerLocation taskManagerLocation) { + @Nullable TaskManagerLocation taskManagerLocation, + Set<ExceptionHistoryEntry> concurrentExceptions) { super(cause, timestamp, failingTaskName, taskManagerLocation); - } - - void add(ExceptionHistoryEntry concurrentException) { - concurrentExceptions.add(concurrentException); + this.concurrentExceptions = concurrentExceptions; } public Iterable<ExceptionHistoryEntry> getConcurrentExceptions() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index e65eba4..ff8d8c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -114,8 +114,7 @@ public class JobExceptionsHandlerTest extends TestLogger { final long rootCauseTimestamp = System.currentTimeMillis(); final ExecutionGraphInfo executionGraphInfo = - createExecutionGraphInfo( - RootExceptionHistoryEntry.fromGlobalFailure(rootCause, rootCauseTimestamp)); + createExecutionGraphInfo(fromGlobalFailure(rootCause, rootCauseTimestamp)); final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> request = createRequest(executionGraphInfo.getJobId(), 10); final JobExceptionsInfoWithHistory response = @@ -134,14 +133,14 @@ public class JobExceptionsHandlerTest extends TestLogger { @Test public void testWithExceptionHistory() throws HandlerRequestException { final RootExceptionHistoryEntry rootCause = - RootExceptionHistoryEntry.fromGlobalFailure( - new RuntimeException("exception #0"), System.currentTimeMillis()); + fromGlobalFailure(new RuntimeException("exception #0"), System.currentTimeMillis()); final RootExceptionHistoryEntry otherFailure = - RootExceptionHistoryEntry.fromLocalFailure( + new RootExceptionHistoryEntry( new RuntimeException("exception #1"), System.currentTimeMillis(), "task name", - new LocalTaskManagerLocation()); + new LocalTaskManagerLocation(), + Collections.emptySet()); final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause, otherFailure); @@ -168,14 +167,14 @@ public class JobExceptionsHandlerTest extends TestLogger { public void testWithExceptionHistoryWithTruncationThroughParameter() throws HandlerRequestException { final RootExceptionHistoryEntry rootCause = - RootExceptionHistoryEntry.fromGlobalFailure( - new RuntimeException("exception #0"), System.currentTimeMillis()); + fromGlobalFailure(new RuntimeException("exception #0"), System.currentTimeMillis()); final RootExceptionHistoryEntry otherFailure = - RootExceptionHistoryEntry.fromLocalFailure( + new RootExceptionHistoryEntry( new RuntimeException("exception #1"), System.currentTimeMillis(), "task name", - new LocalTaskManagerLocation()); + new LocalTaskManagerLocation(), + Collections.emptySet()); final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause, otherFailure); @@ -264,11 +263,12 @@ public class JobExceptionsHandlerTest extends TestLogger { final long failureTimestamp = System.currentTimeMillis(); final List<RootExceptionHistoryEntry> exceptionHistory = Collections.singletonList( - RootExceptionHistoryEntry.fromLocalFailure( + new RootExceptionHistoryEntry( failureCause, failureTimestamp, "test task #1", - new LocalTaskManagerLocation())); + new LocalTaskManagerLocation(), + Collections.emptySet())); return new ExecutionGraphInfo( new ArchivedExecutionGraphBuilder() .setFailureCause(new ErrorInfo(failureCause, failureTimestamp)) @@ -357,6 +357,10 @@ public class JobExceptionsHandlerTest extends TestLogger { queryParameters); } + private static RootExceptionHistoryEntry fromGlobalFailure(Throwable cause, long timestamp) { + return new RootExceptionHistoryEntry(cause, timestamp, null, null, Collections.emptySet()); + } + // -------- factory methods for instantiating new Matchers -------- @SafeVarargs diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 8f8c102..dad9a41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -1207,15 +1207,14 @@ public class DefaultSchedulerTest extends TestLogger { executionVertex1.getCurrentAssignedResourceLocation()))); final RootExceptionHistoryEntry entry1 = actualExceptionHistory.next(); - assertThat(entry1.getConcurrentExceptions(), IsEmptyIterable.emptyIterable()); - FlinkException expectedFailure = - new FlinkException( - "This is a workaround for FLINK-22276: The actual failure was cleaned up already."); assertThat( - entry1.getException() - .deserializeError(ClassLoader.getSystemClassLoader()) - .getMessage(), - is(expectedFailure.getMessage())); + entry1, + is( + ExceptionHistoryEntryMatcher.matchesFailure( + exception1, + updateStateTriggeringRestartTimestamp1, + executionVertex1.getTaskNameWithSubtaskIndex(), + executionVertex1.getCurrentAssignedResourceLocation()))); assertThat(entry1.getConcurrentExceptions(), IsEmptyIterable.emptyIterable()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java new file mode 100644 index 0000000..fa2225b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Optional; + +import static org.apache.flink.runtime.scheduler.exceptionhistory.ArchivedTaskManagerLocationMatcher.isArchivedTaskManagerLocation; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** {@code ExceptionHistoryEntryTest} tests the creation of {@link ExceptionHistoryEntry}. */ +public class ExceptionHistoryEntryTest extends TestLogger { + + @Test + public void testCreate() { + final Throwable failure = new RuntimeException("Expected exception"); + final long timestamp = System.currentTimeMillis(); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final AccessExecution execution = + new TestAccessExecution( + new ExecutionAttemptID(), failure, timestamp, taskManagerLocation); + final String taskName = "task name"; + + final ExceptionHistoryEntry entry = ExceptionHistoryEntry.create(execution, taskName); + + assertThat( + entry.getException().deserializeError(ClassLoader.getSystemClassLoader()), + is(failure)); + assertThat(entry.getTimestamp(), is(timestamp)); + assertThat(entry.getFailingTaskName(), is(taskName)); + assertThat( + entry.getTaskManagerLocation(), isArchivedTaskManagerLocation(taskManagerLocation)); + assertThat(entry.isGlobal(), is(false)); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreationFailure() { + ExceptionHistoryEntry.create( + TestAccessExecution.createExecutionWithoutFailure( + new ExecutionAttemptID(), new LocalTaskManagerLocation()), + "task name"); + } + + private static class TestAccessExecution implements AccessExecution { + + private final ExecutionAttemptID executionAttemptID; + private final ErrorInfo failureInfo; + private final TaskManagerLocation taskManagerLocation; + + private static TestAccessExecution createExecutionWithoutFailure( + ExecutionAttemptID executionAttemptID, TaskManagerLocation taskManagerLocation) { + return new TestAccessExecution(executionAttemptID, null, 0L, taskManagerLocation); + } + + private TestAccessExecution( + ExecutionAttemptID executionAttemptID, + @Nullable Throwable failure, + long timestamp, + TaskManagerLocation taskManagerLocation) { + this.executionAttemptID = executionAttemptID; + this.failureInfo = failure == null ? null : new ErrorInfo(failure, timestamp); + this.taskManagerLocation = taskManagerLocation; + } + + @Override + public ExecutionAttemptID getAttemptId() { + return executionAttemptID; + } + + @Override + public TaskManagerLocation getAssignedResourceLocation() { + return taskManagerLocation; + } + + @Override + public Optional<ErrorInfo> getFailureInfo() { + return Optional.ofNullable(failureInfo); + } + + // -- unsupported methods + + @Override + public int getAttemptNumber() { + throw new UnsupportedOperationException("getAttemptNumber should not be called."); + } + + @Override + public long[] getStateTimestamps() { + throw new UnsupportedOperationException("getStateTimestamps should not be called."); + } + + @Override + public ExecutionState getState() { + throw new UnsupportedOperationException("getState should not be called."); + } + + @Override + public long getStateTimestamp(ExecutionState state) { + throw new UnsupportedOperationException("getStateTimestamp should not be called."); + } + + @Override + public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() { + throw new UnsupportedOperationException( + "getUserAccumulatorsStringified should not be called."); + } + + @Override + public int getParallelSubtaskIndex() { + throw new UnsupportedOperationException( + "getParallelSubtaskIndex should not be called."); + } + + @Override + public IOMetrics getIOMetrics() { + throw new UnsupportedOperationException("getIOMetrics should not be called."); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java new file mode 100644 index 0000000..e541164 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.hamcrest.collection.IsIterableContainingInAnyOrder; +import org.hamcrest.collection.IsIterableContainingInOrder; +import org.hamcrest.core.IsInstanceOf; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * {@code FailureHandlingResultSnapshotTest} tests the creation of {@link + * FailureHandlingResultSnapshot}. + */ +public class FailureHandlingResultSnapshotTest extends TestLogger { + + private ExecutionGraph executionGraph; + + @Before + public void setup() throws JobException, JobExecutionException { + final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); + jobGraph.getVertices().forEach(v -> v.setParallelism(3)); + + executionGraph = + TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); + executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + } + + @Test(expected = IllegalArgumentException.class) + public void testRootCauseVertexNotFailed() { + final ExecutionVertex rootCauseExecutionVertex = extractExecutionVertex(0); + final FailureHandlingResult failureHandlingResult = + FailureHandlingResult.restartable( + rootCauseExecutionVertex.getID(), + new RuntimeException("Expected exception: root cause"), + System.currentTimeMillis(), + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getID) + .collect(Collectors.toSet()), + 0L, + false); + + FailureHandlingResultSnapshot.create(failureHandlingResult, this::getLatestExecution); + } + + @Test // see FLINK-22060/FLINK-21376 + public void testMissingThrowableHandling() { + final ExecutionVertex rootCauseExecutionVertex = extractExecutionVertex(0); + + final long rootCauseTimestamp = triggerFailure(rootCauseExecutionVertex, null); + + final FailureHandlingResult failureHandlingResult = + FailureHandlingResult.restartable( + rootCauseExecutionVertex.getID(), + null, + rootCauseTimestamp, + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getID) + .collect(Collectors.toSet()), + 0L, + false); + + final FailureHandlingResultSnapshot testInstance = + FailureHandlingResultSnapshot.create( + failureHandlingResult, this::getLatestExecution); + + final Throwable actualException = + new SerializedThrowable(testInstance.getRootCause()) + .deserializeError(ClassLoader.getSystemClassLoader()); + assertThat(actualException, IsInstanceOf.instanceOf(FlinkException.class)); + assertThat( + actualException, + FlinkMatchers.containsMessage(ErrorInfo.handleMissingThrowable(null).getMessage())); + assertThat(testInstance.getTimestamp(), is(rootCauseTimestamp)); + assertThat(testInstance.getRootCauseExecution().isPresent(), is(true)); + assertThat( + testInstance.getRootCauseExecution().get(), + is(rootCauseExecutionVertex.getCurrentExecutionAttempt())); + } + + @Test + public void testLocalFailureHandlingResultSnapshotCreation() { + final ExecutionVertex rootCauseExecutionVertex = extractExecutionVertex(0); + final Throwable rootCause = new RuntimeException("Expected exception: root cause"); + final ExecutionVertex otherFailedExecutionVertex = extractExecutionVertex(1); + final Throwable otherFailure = + new IllegalStateException("Expected exception: other failure"); + + final long rootCauseTimestamp = triggerFailure(rootCauseExecutionVertex, rootCause); + triggerFailure(otherFailedExecutionVertex, otherFailure); + + final FailureHandlingResult failureHandlingResult = + FailureHandlingResult.restartable( + rootCauseExecutionVertex.getID(), + rootCause, + rootCauseTimestamp, + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getID) + .collect(Collectors.toSet()), + 0L, + false); + + final FailureHandlingResultSnapshot testInstance = + FailureHandlingResultSnapshot.create( + failureHandlingResult, this::getLatestExecution); + + assertThat(testInstance.getRootCause(), is(rootCause)); + assertThat(testInstance.getTimestamp(), is(rootCauseTimestamp)); + assertThat(testInstance.getRootCauseExecution().isPresent(), is(true)); + assertThat( + testInstance.getRootCauseExecution().get(), + is(rootCauseExecutionVertex.getCurrentExecutionAttempt())); + + assertThat( + testInstance.getConcurrentlyFailedExecution(), + IsIterableContainingInOrder.contains( + otherFailedExecutionVertex.getCurrentExecutionAttempt())); + } + + @Test(expected = IllegalArgumentException.class) + public void testFailureHandlingWithRootCauseExecutionBeingPartOfConcurrentlyFailedExecutions() { + final Execution rootCauseExecution = extractExecutionVertex(0).getCurrentExecutionAttempt(); + new FailureHandlingResultSnapshot( + rootCauseExecution, + new RuntimeException("Expected exception"), + System.currentTimeMillis(), + Collections.singleton(rootCauseExecution)); + } + + @Test + public void testGlobalFailureHandlingResultSnapshotCreation() { + final Throwable rootCause = new FlinkException("Expected exception: root cause"); + final long timestamp = System.currentTimeMillis(); + + final ExecutionVertex failedExecutionVertex0 = extractExecutionVertex(0); + final Throwable failure0 = new RuntimeException("Expected exception: failure #0"); + final ExecutionVertex failedExecutionVertex1 = extractExecutionVertex(1); + final Throwable failure1 = new IllegalStateException("Expected exception: failure #1"); + + triggerFailure(failedExecutionVertex0, failure0); + triggerFailure(failedExecutionVertex1, failure1); + + final FailureHandlingResult failureHandlingResult = + FailureHandlingResult.restartable( + null, + rootCause, + timestamp, + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getID) + .collect(Collectors.toSet()), + 0L, + true); + + final FailureHandlingResultSnapshot testInstance = + FailureHandlingResultSnapshot.create( + failureHandlingResult, this::getLatestExecution); + + assertThat(testInstance.getRootCause(), is(rootCause)); + assertThat(testInstance.getTimestamp(), is(timestamp)); + assertThat(testInstance.getRootCauseExecution().isPresent(), is(false)); + + assertThat( + testInstance.getConcurrentlyFailedExecution(), + IsIterableContainingInAnyOrder.containsInAnyOrder( + failedExecutionVertex0.getCurrentExecutionAttempt(), + failedExecutionVertex1.getCurrentExecutionAttempt())); + } + + private Execution getLatestExecution(ExecutionVertexID executionVertexId) { + if (!executionGraph.getAllVertices().containsKey(executionVertexId.getJobVertexId())) { + throw new IllegalArgumentException( + "The ExecutionJobVertex having the ID " + + executionVertexId.getJobVertexId() + + " does not exist."); + } + + final ExecutionVertex[] executions = + executionGraph + .getAllVertices() + .get(executionVertexId.getJobVertexId()) + .getTaskVertices(); + + if (executions.length <= executionVertexId.getSubtaskIndex()) { + throw new IllegalArgumentException( + "The ExecutionVertex having the subtask ID " + + executionVertexId.getSubtaskIndex() + + " for ExecutionJobVertex " + + executionVertexId.getJobVertexId() + + " does not exist."); + } + + return executions[executionVertexId.getSubtaskIndex()].getCurrentExecutionAttempt(); + } + + private long triggerFailure(ExecutionVertex executionVertex, Throwable throwable) { + executionGraph.updateState( + new TaskExecutionStateTransition( + new TaskExecutionState( + executionVertex.getCurrentExecutionAttempt().getAttemptId(), + ExecutionState.FAILED, + throwable))); + + return executionVertex + .getFailureInfo() + .orElseThrow( + () -> + new IllegalArgumentException( + "The transition into failed state didn't succeed for ExecutionVertex " + + executionVertex.getID() + + ".")) + .getTimestamp(); + } + + private ExecutionVertex extractExecutionVertex(int pos) { + final ExecutionVertex executionVertex = + Iterables.get(executionGraph.getAllExecutionVertices(), pos); + executionVertex.tryAssignResource( + new TestingLogicalSlotBuilder().createTestingLogicalSlot()); + + return executionVertex; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java similarity index 65% rename from flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java index 844d9b9..10821cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java @@ -28,29 +28,27 @@ import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import java.util.Arrays; import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.junit.Assert.assertThat; -/** {@code ExceptionHistoryEntryExtractorTest} tests {@link ExceptionHistoryEntryExtractor}. */ -public class ExceptionHistoryEntryExtractorTest extends TestLogger { - - private final ExceptionHistoryEntryExtractor testInstance = - new ExceptionHistoryEntryExtractor(); +/** + * {@code RootExceptionHistoryEntryTest} tests the creation of {@link RootExceptionHistoryEntry}. + */ +public class RootExceptionHistoryEntryTest extends TestLogger { private ExecutionGraph executionGraph; @@ -64,65 +62,8 @@ public class ExceptionHistoryEntryExtractorTest extends TestLogger { executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); } - @Test(expected = IllegalArgumentException.class) - public void testWrongExecutionVertexIdOfFailingVertex() { - testInstance.extractLocalFailure( - executionGraph.getAllVertices(), - new ExecutionVertexID(new JobVertexID(), 0), - Collections.emptyList()); - } - - @Test(expected = IllegalArgumentException.class) - public void testWrongSubtaskIndexOfFailingVertex() { - final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); - triggerFailure(rootExecutionVertex, new RuntimeException("Expected root cause")); - testInstance.extractLocalFailure( - executionGraph.getAllVertices(), - new ExecutionVertexID( - rootExecutionVertex.getID().getJobVertexId(), Integer.MAX_VALUE), - Collections.emptyList()); - } - - @Ignore // disabled due to FLINK-22276 workaround in ExceptionHistoryEntryExtractor - @Test(expected = IllegalArgumentException.class) - public void testRootExecutionVertexIdNotFailed() { - final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); - testInstance.extractLocalFailure( - executionGraph.getAllVertices(), - rootExecutionVertex.getID(), - Collections.emptyList()); - } - - @Test(expected = IllegalArgumentException.class) - public void testWrongExecutionVertexIdOfConcurrentlyFailedVertex() { - final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); - triggerFailure(rootExecutionVertex, new RuntimeException("Expected root cause")); - testInstance.extractLocalFailure( - executionGraph.getAllVertices(), - rootExecutionVertex.getID(), - Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0))); - } - - @Test(expected = IllegalArgumentException.class) - public void testWrongSubtaskIndexOfConcurrentlyFailedVertex() { - final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); - triggerFailure(rootExecutionVertex, new RuntimeException("Expected root cause")); - final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); - triggerFailure( - concurrentlyFailedExecutionVertex, - new RuntimeException("Expected concurrent failure")); - - testInstance.extractLocalFailure( - executionGraph.getAllVertices(), - rootExecutionVertex.getID(), - Collections.singleton( - new ExecutionVertexID( - concurrentlyFailedExecutionVertex.getJobvertexId(), - Integer.MAX_VALUE))); - } - @Test - public void extractLocalFailure() { + public void testFromFailureHandlingResultSnapshot() { final Throwable rootException = new RuntimeException("Expected root failure"); final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); final long rootTimestamp = triggerFailure(rootExecutionVertex, rootException); @@ -132,15 +73,15 @@ public class ExceptionHistoryEntryExtractorTest extends TestLogger { final long concurrentExceptionTimestamp = triggerFailure(concurrentlyFailedExecutionVertex, concurrentException); - final ExecutionVertex notFailedExecutionVertex = extractExecutionVertex(2); - + final FailureHandlingResultSnapshot snapshot = + new FailureHandlingResultSnapshot( + rootExecutionVertex.getCurrentExecutionAttempt(), + rootException, + rootTimestamp, + Collections.singleton( + concurrentlyFailedExecutionVertex.getCurrentExecutionAttempt())); final RootExceptionHistoryEntry actualEntry = - testInstance.extractLocalFailure( - executionGraph.getAllVertices(), - rootExecutionVertex.getID(), - Arrays.asList( - concurrentlyFailedExecutionVertex.getID(), - notFailedExecutionVertex.getID())); + RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(snapshot); assertThat( actualEntry, @@ -161,7 +102,7 @@ public class ExceptionHistoryEntryExtractorTest extends TestLogger { } @Test - public void extractGlobalFailure() { + public void testFromGlobalFailure() { final Throwable concurrentException0 = new RuntimeException("Expected concurrent failure #0"); final ExecutionVertex concurrentlyFailedExecutionVertex0 = extractExecutionVertex(0); @@ -177,15 +118,21 @@ public class ExceptionHistoryEntryExtractorTest extends TestLogger { final Throwable rootCause = new Exception("Expected root failure"); final long rootTimestamp = System.currentTimeMillis(); final RootExceptionHistoryEntry actualEntry = - testInstance.extractGlobalFailure( - executionGraph.getAllExecutionVertices(), rootCause, rootTimestamp); + RootExceptionHistoryEntry.fromGlobalFailure( + rootCause, + rootTimestamp, + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .collect(Collectors.toSet())); assertThat( actualEntry, ExceptionHistoryEntryMatcher.matchesGlobalFailure(rootCause, rootTimestamp)); assertThat( actualEntry.getConcurrentExceptions(), - IsIterableContainingInOrder.contains( + IsIterableContainingInAnyOrder.containsInAnyOrder( ExceptionHistoryEntryMatcher.matchesFailure( concurrentException0, concurrentExceptionTimestamp0,
