This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f4105f65fb510de11ccb08c4b31d5e4c30744151
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Apr 15 10:12:25 2021 +0200

    [FLINK-22276][runtime] Fixes the concurrency issue
    
    This commit fixes an issue where multiple failures can occur close to each 
other.
    In that case, the DefaultScheduler's restart logic competes for each of 
these
    failures. If multiple failures refer to the same Execution, it might be 
that the
    restart due to one failure handling cleans up the failure already. This 
leads to
    an IllegalArgumentException when archiving the next failure refering to the 
same
    Execution. The issue was that the code relied on ExecutionVertices instead 
of
    Executions.
    
    The new implementation relies on the Executions that were present when the
    failure was handled. Therefore, FailureHandlingResultSnapshot is 
introduced. It
    extracts the Execution information from the ExecutionGraph.
    
    Additionally, instead of accessing on 
ExecutionVertex.getTaskNameWithSubtaskIndex()
    to collect the task name, the new implementation relies on
    Execution.getVertexWithAttempt(). This enables us to solely rely on the 
Execution
    without an extra dependency on the ExecutionVertex.
    
    The new implementation also removes the add method from 
RootExceptionHistoryEntry.
    This makes the instantiation cleaner. ExceptionHistoryEntryExtractor was 
replaced
    by the factory methods 
RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot and
    RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot as part of this 
effort.
    
    This closes #15640.
---
 .../flink/runtime/executiongraph/ErrorInfo.java    |  20 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  |   8 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |  43 ++--
 .../exceptionhistory/ExceptionHistoryEntry.java    |  38 ++-
 .../ExceptionHistoryEntryExtractor.java            | 157 ------------
 .../FailureHandlingResultSnapshot.java             | 152 ++++++++++++
 .../RootExceptionHistoryEntry.java                 |  97 +++++---
 .../rest/handler/job/JobExceptionsHandlerTest.java |  28 ++-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  15 +-
 .../ExceptionHistoryEntryTest.java                 | 149 +++++++++++
 .../FailureHandlingResultSnapshotTest.java         | 276 +++++++++++++++++++++
 ...est.java => RootExceptionHistoryEntryTest.java} | 105 ++------
 12 files changed, 773 insertions(+), 315 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
index cfbfc6f..0663ea1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -49,12 +49,20 @@ public class ErrorInfo implements Serializable {
      */
     public static ErrorInfo createErrorInfoWithNullableCause(
             @Nullable Throwable exception, long timestamp) {
-        return new ErrorInfo(
-                exception != null
-                        ? exception
-                        : new FlinkException(
-                                "Unknown cause for Execution failure (this 
might be caused by FLINK-21376)."),
-                timestamp);
+        return new ErrorInfo(handleMissingThrowable(exception), timestamp);
+    }
+
+    /**
+     * Utility method to cover FLINK-21376.
+     *
+     * @param throwable The actual exception.
+     * @return a {@link FlinkException} if no exception was passed.
+     */
+    public static Throwable handleMissingThrowable(@Nullable Throwable 
throwable) {
+        return throwable != null
+                ? throwable
+                : new FlinkException(
+                        "Unknown cause for Execution failure (this might be 
caused by FLINK-21376).");
     }
 
     public ErrorInfo(@Nonnull Throwable exception, long timestamp) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 33e8636..c9d77de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
+import 
org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
@@ -251,12 +252,17 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
         final CompletableFuture<?> cancelFuture = 
cancelTasksAsync(verticesToRestart);
 
+        final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
+                FailureHandlingResultSnapshot.create(
+                        failureHandlingResult,
+                        id -> 
this.getExecutionVertex(id).getCurrentExecutionAttempt());
         delayExecutor.schedule(
                 () ->
                         FutureUtils.assertNoException(
                                 cancelFuture.thenRunAsync(
                                         () -> {
-                                            
archiveFromFailureHandlingResult(failureHandlingResult);
+                                            archiveFromFailureHandlingResult(
+                                                    
failureHandlingResultSnapshot);
                                             
restartTasks(executionVertexVersions, globalRecovery);
                                         },
                                         getMainThreadExecutor())),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 0ab5672..c675e66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -52,7 +52,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
-import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -75,7 +74,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
-import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryExtractor;
+import 
org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
 import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
@@ -148,7 +147,6 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
 
     private final ComponentMainThreadExecutor mainThreadExecutor;
 
-    private final ExceptionHistoryEntryExtractor 
exceptionHistoryEntryExtractor;
     private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
 
     private final ExecutionGraphFactory executionGraphFactory;
@@ -215,7 +213,6 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
                 new DefaultOperatorCoordinatorHandler(executionGraph, 
this::handleGlobalFailure);
         
operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);
 
-        this.exceptionHistoryEntryExtractor = new 
ExceptionHistoryEntryExtractor();
         this.exceptionHistory =
                 new BoundedFIFOQueue<>(
                         
jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
@@ -629,38 +626,42 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
         return executionGraph.getTerminationFuture();
     }
 
-    protected final void archiveGlobalFailure(@Nullable Throwable failure) {
-        archiveGlobalFailure(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED));
+    protected final void archiveGlobalFailure(Throwable failure) {
+        archiveGlobalFailure(
+                failure,
+                executionGraph.getStatusTimestamp(JobStatus.FAILED),
+                
StreamSupport.stream(executionGraph.getAllExecutionVertices().spliterator(), 
false)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .collect(Collectors.toSet()));
     }
 
-    private void archiveGlobalFailure(@Nullable Throwable failure, long 
timestamp) {
+    private void archiveGlobalFailure(
+            Throwable failure, long timestamp, Iterable<Execution> executions) 
{
         exceptionHistory.add(
-                exceptionHistoryEntryExtractor.extractGlobalFailure(
-                        executionGraph.getAllExecutionVertices(), failure, 
timestamp));
+                RootExceptionHistoryEntry.fromGlobalFailure(failure, 
timestamp, executions));
         log.debug("Archive global failure.", failure);
     }
 
     protected final void archiveFromFailureHandlingResult(
-            FailureHandlingResult failureHandlingResult) {
-        if 
(failureHandlingResult.getExecutionVertexIdOfFailedTask().isPresent()) {
-            final ExecutionVertexID executionVertexId =
-                    
failureHandlingResult.getExecutionVertexIdOfFailedTask().get();
+            FailureHandlingResultSnapshot failureHandlingResult) {
+        if (failureHandlingResult.getRootCauseExecution().isPresent()) {
+            final Execution rootCauseExecution =
+                    failureHandlingResult.getRootCauseExecution().get();
+
             final RootExceptionHistoryEntry rootEntry =
-                    exceptionHistoryEntryExtractor.extractLocalFailure(
-                            executionGraph.getAllVertices(),
-                            executionVertexId,
-                            
failureHandlingResult.getVerticesToRestart().stream()
-                                    .filter(v -> !executionVertexId.equals(v))
-                                    .collect(Collectors.toSet()));
+                    
RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(
+                            failureHandlingResult);
             exceptionHistory.add(rootEntry);
 
             log.debug(
                     "Archive local failure causing attempt {} to fail: {}",
-                    executionVertexId,
+                    rootCauseExecution.getAttemptId(),
                     rootEntry.getExceptionAsString());
         } else {
             archiveGlobalFailure(
-                    failureHandlingResult.getError(), 
failureHandlingResult.getTimestamp());
+                    failureHandlingResult.getRootCause(),
+                    failureHandlingResult.getTimestamp(),
+                    failureHandlingResult.getConcurrentlyFailedExecution());
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
index 08a69ef..40b782d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.scheduler.exceptionhistory;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -39,7 +42,40 @@ public class ExceptionHistoryEntry extends ErrorInfo {
     @Nullable private final String failingTaskName;
     @Nullable private final ArchivedTaskManagerLocation taskManagerLocation;
 
-    ExceptionHistoryEntry(
+    /**
+     * Creates an {@code ExceptionHistoryEntry} based on the provided {@code 
Execution}.
+     *
+     * @param failedExecution the failed {@code Execution}.
+     * @param taskName the name of the task.
+     * @return The {@code ExceptionHistoryEntry}.
+     * @throws IllegalArgumentException if the passed {@code Execution} does 
not provide a {@link
+     *     Execution#getFailureInfo() failureInfo}.
+     */
+    public static ExceptionHistoryEntry create(AccessExecution 
failedExecution, String taskName) {
+        Preconditions.checkArgument(
+                failedExecution.getFailureInfo().isPresent(),
+                "The selected Execution " + failedExecution.getAttemptId() + " 
didn't fail.");
+
+        final ErrorInfo failure = failedExecution.getFailureInfo().get();
+        return new ExceptionHistoryEntry(
+                failure.getException(),
+                failure.getTimestamp(),
+                taskName,
+                failedExecution.getAssignedResourceLocation());
+    }
+
+    /**
+     * Instantiates a {@code ExceptionHistoryEntry}.
+     *
+     * @param cause The reason for the failure.
+     * @param timestamp The time the failure was caught.
+     * @param failingTaskName The name of the task that failed.
+     * @param taskManagerLocation The host the task was running on.
+     * @throws NullPointerException if {@code cause} is {@code null}.
+     * @throws IllegalArgumentException if the passed {@code timestamp} is not 
bigger than {@code
+     *     0}.
+     */
+    protected ExceptionHistoryEntry(
             Throwable cause,
             long timestamp,
             @Nullable String failingTaskName,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java
deleted file mode 100644
index 98d4dcd..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.exceptionhistory;
-
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.function.QuadFunction;
-
-import java.util.Map;
-
-/**
- * {@code ExceptionHistoryEntryExtractor} extracts all the necessary 
information from given
- * executions to create corresponding {@link RootExceptionHistoryEntry 
RootExceptionHistoryEntries}.
- */
-public class ExceptionHistoryEntryExtractor {
-
-    /**
-     * Extracts a {@link RootExceptionHistoryEntry} based on the passed local 
failure information.
-     *
-     * @param executionJobVertices The {@link ExecutionJobVertex} instances 
registry.
-     * @param failedExecutionVertexId The {@link ExecutionVertexID} referring 
to the {@link
-     *     ExecutionVertex} that is the root of the failure.
-     * @param otherAffectedVertices The {@code ExecutionVertexID}s of other 
affected {@code
-     *     ExecutionVertices} that, if failed as well, would be added as 
concurrent failures.
-     * @return The {@code RootExceptionHistoryEntry}.
-     * @throws IllegalArgumentException if one of the passed {@code 
ExecutionVertexID}s cannot be
-     *     resolved into an {@code ExecutionVertex}.
-     * @throws IllegalArgumentException if the {@code failedExecutionVertexID} 
refers to an {@code
-     *     ExecutionVertex} that didn't fail.
-     */
-    public RootExceptionHistoryEntry extractLocalFailure(
-            Map<JobVertexID, ExecutionJobVertex> executionJobVertices,
-            ExecutionVertexID failedExecutionVertexId,
-            Iterable<ExecutionVertexID> otherAffectedVertices) {
-        final ExecutionVertex rootCauseExecutionVertex =
-                getExecutionVertex(executionJobVertices, 
failedExecutionVertexId);
-
-        if (rootCauseExecutionVertex.getFailureInfo().isPresent()) {
-            final RootExceptionHistoryEntry root =
-                    createLocalExceptionHistoryEntry(
-                            RootExceptionHistoryEntry::fromLocalFailure, 
rootCauseExecutionVertex);
-
-            for (ExecutionVertexID otherExecutionVertexId : 
otherAffectedVertices) {
-                final ExecutionVertex executionVertex =
-                        getExecutionVertex(executionJobVertices, 
otherExecutionVertexId);
-                if (executionVertex.getFailureInfo().isPresent()) {
-                    root.add(
-                            createLocalExceptionHistoryEntry(
-                                    ExceptionHistoryEntry::new, 
executionVertex));
-                }
-            }
-
-            return root;
-        }
-
-        return RootExceptionHistoryEntry.fromGlobalFailure(
-                new FlinkException(
-                        "This is a workaround for FLINK-22276: The actual 
failure was cleaned up already."),
-                System.currentTimeMillis());
-    }
-
-    /**
-     * Extracts a {@link RootExceptionHistoryEntry} based on the global 
failure information.
-     *
-     * @param executionVertices The {@link ExecutionVertex ExecutionVertices} 
that are affected by
-     *     the global failure and, if failed as well, would be added as 
concurrent failures to the
-     *     entry.
-     * @param rootCause The {@code Throwable} causing the failure.
-     * @param timestamp The timestamp the failure occurred.
-     * @return The {@code RootExceptionHistoryEntry}.
-     * @throws IllegalArgumentException if one of the passed {@code 
ExecutionVertexID}s cannot be *
-     *     resolved into an {@code ExecutionVertex}.
-     */
-    public RootExceptionHistoryEntry extractGlobalFailure(
-            Iterable<ExecutionVertex> executionVertices, Throwable rootCause, 
long timestamp) {
-        final RootExceptionHistoryEntry root =
-                RootExceptionHistoryEntry.fromGlobalFailure(rootCause, 
timestamp);
-
-        for (ExecutionVertex executionVertex : executionVertices) {
-            if (!executionVertex.getFailureInfo().isPresent()) {
-                continue;
-            }
-
-            final ExceptionHistoryEntry exceptionHistoryEntry =
-                    
createLocalExceptionHistoryEntry(ExceptionHistoryEntry::new, executionVertex);
-            if (exceptionHistoryEntry != null) {
-                root.add(exceptionHistoryEntry);
-            }
-        }
-
-        return root;
-    }
-
-    private static ExecutionVertex getExecutionVertex(
-            Map<JobVertexID, ExecutionJobVertex> executionJobVertices,
-            ExecutionVertexID executionVertexID) {
-        final ExecutionJobVertex executionJobVertex =
-                executionJobVertices.get(executionVertexID.getJobVertexId());
-
-        Preconditions.checkArgument(
-                executionJobVertex != null,
-                "The passed ExecutionVertexID does not correspond to any 
ExecutionJobVertex provided.");
-        final ExecutionVertex[] executionVertices = 
executionJobVertex.getTaskVertices();
-
-        Preconditions.checkArgument(
-                executionVertices.length > executionVertexID.getSubtaskIndex(),
-                "The ExecutionJobVertex referred by the passed 
ExecutionVertexID does not have the right amount of subtasks (expected subtask 
ID: {}; actual number of subtasks: {}).",
-                executionVertexID.getSubtaskIndex(),
-                executionJobVertex.getTaskVertices().length);
-        return executionVertices[executionVertexID.getSubtaskIndex()];
-    }
-
-    private static <E extends ExceptionHistoryEntry> E 
createLocalExceptionHistoryEntry(
-            QuadFunction<SerializedThrowable, Long, String, 
TaskManagerLocation, E>
-                    exceptionHistoryEntryCreator,
-            ExecutionVertex executionVertex) {
-        return executionVertex
-                .getFailureInfo()
-                .map(
-                        failureInfo ->
-                                exceptionHistoryEntryCreator.apply(
-                                        failureInfo.getException(),
-                                        failureInfo.getTimestamp(),
-                                        
executionVertex.getTaskNameWithSubtaskIndex(),
-                                        executionVertex
-                                                .getCurrentExecutionAttempt()
-                                                
.getAssignedResourceLocation()))
-                .orElseThrow(
-                        () ->
-                                new IllegalArgumentException(
-                                        "The selected ExecutionVertex "
-                                                + executionVertex.getID()
-                                                + " didn't fail."));
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
new file mode 100644
index 0000000..7ec115a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link 
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+    @Nullable private final Execution rootCauseExecution;
+    private final Throwable rootCause;
+    private final long timestamp;
+    private final Set<Execution> concurrentlyFailedExecutions;
+
+    /**
+     * Creates a {@code FailureHandlingResultSnapshot} based on the passed 
{@link
+     * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
+     *
+     * @param failureHandlingResult The {@code FailureHandlingResult} that is 
used for extracting
+     *     the failure information.
+     * @param latestExecutionLookup The look-up function for retrieving the 
latest {@link Execution}
+     *     instance for a given {@link ExecutionVertexID}.
+     * @return The {@code FailureHandlingResultSnapshot}.
+     */
+    public static FailureHandlingResultSnapshot create(
+            FailureHandlingResult failureHandlingResult,
+            Function<ExecutionVertexID, Execution> latestExecutionLookup) {
+        final Execution rootCauseExecution =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(latestExecutionLookup)
+                        .orElse(null);
+        Preconditions.checkArgument(
+                rootCauseExecution == null || 
rootCauseExecution.getFailureInfo().isPresent(),
+                String.format(
+                        "The execution %s didn't provide a failure info even 
though the corresponding ExecutionVertex %s is marked as having handled the 
root cause of this failure.",
+                        // the "(null)" values should never be used due to the 
condition - it's just
+                        // added to make the compiler happy
+                        rootCauseExecution != null ? 
rootCauseExecution.getAttemptId() : "(null)",
+                        failureHandlingResult
+                                .getExecutionVertexIdOfFailedTask()
+                                .map(Objects::toString)
+                                .orElse("(null)")));
+
+        final ExecutionVertexID rootCauseExecutionVertexId =
+                
failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null);
+        final Set<Execution> concurrentlyFailedExecutions =
+                failureHandlingResult.getVerticesToRestart().stream()
+                        .filter(
+                                executionVertexId ->
+                                        
!executionVertexId.equals(rootCauseExecutionVertexId))
+                        .map(latestExecutionLookup)
+                        .filter(execution -> 
execution.getFailureInfo().isPresent())
+                        .collect(Collectors.toSet());
+
+        return new FailureHandlingResultSnapshot(
+                rootCauseExecution,
+                
ErrorInfo.handleMissingThrowable(failureHandlingResult.getError()),
+                failureHandlingResult.getTimestamp(),
+                concurrentlyFailedExecutions);
+    }
+
+    @VisibleForTesting
+    FailureHandlingResultSnapshot(
+            @Nullable Execution rootCauseExecution,
+            Throwable rootCause,
+            long timestamp,
+            Set<Execution> concurrentlyFailedExecutions) {
+        Preconditions.checkArgument(
+                rootCauseExecution == null
+                        || 
!concurrentlyFailedExecutions.contains(rootCauseExecution),
+                "The rootCauseExecution should not be part of the 
concurrentlyFailedExecutions map.");
+
+        this.rootCauseExecution = rootCauseExecution;
+        this.rootCause = Preconditions.checkNotNull(rootCause);
+        this.timestamp = timestamp;
+        this.concurrentlyFailedExecutions =
+                Preconditions.checkNotNull(concurrentlyFailedExecutions);
+    }
+
+    /**
+     * Returns the {@link Execution} that handled the root cause for this 
failure. An empty {@code
+     * Optional} will be returned if it's a global failure.
+     *
+     * @return The {@link Execution} that handled the root cause for this 
failure.
+     */
+    public Optional<Execution> getRootCauseExecution() {
+        return Optional.ofNullable(rootCauseExecution);
+    }
+
+    /**
+     * The actual failure that is handled.
+     *
+     * @return The {@code Throwable}.
+     */
+    public Throwable getRootCause() {
+        return rootCause;
+    }
+
+    /**
+     * The time the failure occurred.
+     *
+     * @return The time of the failure.
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    /**
+     * All {@link Execution Executions} that failed and are planned to be 
restarted as part of this
+     * failure handling.
+     *
+     * @return The concurrently failed {@code Executions}.
+     */
+    public Iterable<Execution> getConcurrentlyFailedExecution() {
+        return Collections.unmodifiableSet(concurrentlyFailedExecutions);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
index 400e3d3..f84b6fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
@@ -19,14 +19,14 @@
 package org.apache.flink.runtime.scheduler.exceptionhistory;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.Collection;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 /**
  * {@code RootExceptionHistoryEntry} extending {@link ExceptionHistoryEntry} 
by providing a list of
@@ -36,57 +36,94 @@ public class RootExceptionHistoryEntry extends 
ExceptionHistoryEntry {
 
     private static final long serialVersionUID = -7647332765867297434L;
 
-    private final Collection<ExceptionHistoryEntry> concurrentExceptions = new 
ArrayList<>();
+    private final Set<ExceptionHistoryEntry> concurrentExceptions;
+
+    /**
+     * Creates a {@code RootExceptionHistoryEntry} based on the passed {@link
+     * FailureHandlingResultSnapshot}.
+     *
+     * @param snapshot The reason for the failure.
+     * @return The {@code RootExceptionHistoryEntry} instance.
+     * @throws NullPointerException if {@code cause} or {@code 
failingTaskName} are {@code null}.
+     * @throws IllegalArgumentException if the {@code timestamp} of the passed 
{@code
+     *     FailureHandlingResult} is not bigger than {@code 0}.
+     */
+    public static RootExceptionHistoryEntry fromFailureHandlingResultSnapshot(
+            FailureHandlingResultSnapshot snapshot) {
+        String failingTaskName = null;
+        TaskManagerLocation taskManagerLocation = null;
+        if (snapshot.getRootCauseExecution().isPresent()) {
+            final Execution rootCauseExecution = 
snapshot.getRootCauseExecution().get();
+            failingTaskName = rootCauseExecution.getVertexWithAttempt();
+            taskManagerLocation = 
rootCauseExecution.getAssignedResourceLocation();
+        }
+
+        return createRootExceptionHistoryEntry(
+                snapshot.getRootCause(),
+                snapshot.getTimestamp(),
+                failingTaskName,
+                taskManagerLocation,
+                snapshot.getConcurrentlyFailedExecution());
+    }
 
     /**
      * Creates a {@code RootExceptionHistoryEntry} representing a global 
failure from the passed
-     * {@code Throwable} and timestamp.
+     * {@code Throwable} and timestamp. If any of the passed {@link Execution 
Executions} failed, it
+     * will be added to the {@code RootExceptionHistoryEntry}'s concurrently 
caught failures.
      *
      * @param cause The reason for the failure.
      * @param timestamp The time the failure was caught.
+     * @param executions The {@link Execution} instances that shall be 
analyzed for failures.
      * @return The {@code RootExceptionHistoryEntry} instance.
-     * @throws NullPointerException if {@code cause} is {@code null}.
+     * @throws NullPointerException if {@code failure} is {@code null}.
      * @throws IllegalArgumentException if the passed {@code timestamp} is not 
bigger than {@code
      *     0}.
      */
-    @VisibleForTesting
-    public static RootExceptionHistoryEntry fromGlobalFailure(Throwable cause, 
long timestamp) {
-        return new RootExceptionHistoryEntry(cause, timestamp, null, null);
+    public static RootExceptionHistoryEntry fromGlobalFailure(
+            Throwable cause, long timestamp, Iterable<Execution> executions) {
+        return createRootExceptionHistoryEntry(cause, timestamp, null, null, 
executions);
+    }
+
+    private static RootExceptionHistoryEntry createRootExceptionHistoryEntry(
+            Throwable cause,
+            long timestamp,
+            @Nullable String failingTaskName,
+            @Nullable TaskManagerLocation taskManagerLocation,
+            Iterable<Execution> executions) {
+        return new RootExceptionHistoryEntry(
+                cause,
+                timestamp,
+                failingTaskName,
+                taskManagerLocation,
+                StreamSupport.stream(executions.spliterator(), false)
+                        .filter(execution -> 
execution.getFailureInfo().isPresent())
+                        .map(
+                                execution ->
+                                        ExceptionHistoryEntry.create(
+                                                execution, 
execution.getVertexWithAttempt()))
+                        .collect(Collectors.toSet()));
     }
 
     /**
-     * Creates a {@code RootExceptionHistoryEntry} representing a local 
failure using the passed
-     * information.
+     * Instantiates a {@code RootExceptionHistoryEntry}.
      *
      * @param cause The reason for the failure.
      * @param timestamp The time the failure was caught.
      * @param failingTaskName The name of the task that failed.
-     * @param taskManagerLocation The {@link TaskManagerLocation} the task was 
running on.
-     * @return The {@code RootExceptionHistoryEntry} instance.
-     * @throws NullPointerException if {@code cause} or {@code 
failingTaskName} are {@code null}.
+     * @param taskManagerLocation The host the task was running on.
+     * @throws NullPointerException if {@code cause} is {@code null}.
      * @throws IllegalArgumentException if the passed {@code timestamp} is not 
bigger than {@code
      *     0}.
      */
     @VisibleForTesting
-    public static RootExceptionHistoryEntry fromLocalFailure(
-            Throwable cause,
-            long timestamp,
-            String failingTaskName,
-            @Nullable TaskManagerLocation taskManagerLocation) {
-        return new RootExceptionHistoryEntry(
-                cause, timestamp, checkNotNull(failingTaskName), 
taskManagerLocation);
-    }
-
-    private RootExceptionHistoryEntry(
+    public RootExceptionHistoryEntry(
             Throwable cause,
             long timestamp,
             @Nullable String failingTaskName,
-            @Nullable TaskManagerLocation taskManagerLocation) {
+            @Nullable TaskManagerLocation taskManagerLocation,
+            Set<ExceptionHistoryEntry> concurrentExceptions) {
         super(cause, timestamp, failingTaskName, taskManagerLocation);
-    }
-
-    void add(ExceptionHistoryEntry concurrentException) {
-        concurrentExceptions.add(concurrentException);
+        this.concurrentExceptions = concurrentExceptions;
     }
 
     public Iterable<ExceptionHistoryEntry> getConcurrentExceptions() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index e65eba4..ff8d8c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -114,8 +114,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
         final long rootCauseTimestamp = System.currentTimeMillis();
 
         final ExecutionGraphInfo executionGraphInfo =
-                createExecutionGraphInfo(
-                        RootExceptionHistoryEntry.fromGlobalFailure(rootCause, 
rootCauseTimestamp));
+                createExecutionGraphInfo(fromGlobalFailure(rootCause, 
rootCauseTimestamp));
         final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
                 createRequest(executionGraphInfo.getJobId(), 10);
         final JobExceptionsInfoWithHistory response =
@@ -134,14 +133,14 @@ public class JobExceptionsHandlerTest extends TestLogger {
     @Test
     public void testWithExceptionHistory() throws HandlerRequestException {
         final RootExceptionHistoryEntry rootCause =
-                RootExceptionHistoryEntry.fromGlobalFailure(
-                        new RuntimeException("exception #0"), 
System.currentTimeMillis());
+                fromGlobalFailure(new RuntimeException("exception #0"), 
System.currentTimeMillis());
         final RootExceptionHistoryEntry otherFailure =
-                RootExceptionHistoryEntry.fromLocalFailure(
+                new RootExceptionHistoryEntry(
                         new RuntimeException("exception #1"),
                         System.currentTimeMillis(),
                         "task name",
-                        new LocalTaskManagerLocation());
+                        new LocalTaskManagerLocation(),
+                        Collections.emptySet());
 
         final ExecutionGraphInfo executionGraphInfo =
                 createExecutionGraphInfo(rootCause, otherFailure);
@@ -168,14 +167,14 @@ public class JobExceptionsHandlerTest extends TestLogger {
     public void testWithExceptionHistoryWithTruncationThroughParameter()
             throws HandlerRequestException {
         final RootExceptionHistoryEntry rootCause =
-                RootExceptionHistoryEntry.fromGlobalFailure(
-                        new RuntimeException("exception #0"), 
System.currentTimeMillis());
+                fromGlobalFailure(new RuntimeException("exception #0"), 
System.currentTimeMillis());
         final RootExceptionHistoryEntry otherFailure =
-                RootExceptionHistoryEntry.fromLocalFailure(
+                new RootExceptionHistoryEntry(
                         new RuntimeException("exception #1"),
                         System.currentTimeMillis(),
                         "task name",
-                        new LocalTaskManagerLocation());
+                        new LocalTaskManagerLocation(),
+                        Collections.emptySet());
 
         final ExecutionGraphInfo executionGraphInfo =
                 createExecutionGraphInfo(rootCause, otherFailure);
@@ -264,11 +263,12 @@ public class JobExceptionsHandlerTest extends TestLogger {
         final long failureTimestamp = System.currentTimeMillis();
         final List<RootExceptionHistoryEntry> exceptionHistory =
                 Collections.singletonList(
-                        RootExceptionHistoryEntry.fromLocalFailure(
+                        new RootExceptionHistoryEntry(
                                 failureCause,
                                 failureTimestamp,
                                 "test task #1",
-                                new LocalTaskManagerLocation()));
+                                new LocalTaskManagerLocation(),
+                                Collections.emptySet()));
         return new ExecutionGraphInfo(
                 new ArchivedExecutionGraphBuilder()
                         .setFailureCause(new ErrorInfo(failureCause, 
failureTimestamp))
@@ -357,6 +357,10 @@ public class JobExceptionsHandlerTest extends TestLogger {
                 queryParameters);
     }
 
+    private static RootExceptionHistoryEntry fromGlobalFailure(Throwable 
cause, long timestamp) {
+        return new RootExceptionHistoryEntry(cause, timestamp, null, null, 
Collections.emptySet());
+    }
+
     // -------- factory methods for instantiating new Matchers --------
 
     @SafeVarargs
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 8f8c102..dad9a41 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1207,15 +1207,14 @@ public class DefaultSchedulerTest extends TestLogger {
                                 
executionVertex1.getCurrentAssignedResourceLocation())));
 
         final RootExceptionHistoryEntry entry1 = actualExceptionHistory.next();
-        assertThat(entry1.getConcurrentExceptions(), 
IsEmptyIterable.emptyIterable());
-        FlinkException expectedFailure =
-                new FlinkException(
-                        "This is a workaround for FLINK-22276: The actual 
failure was cleaned up already.");
         assertThat(
-                entry1.getException()
-                        .deserializeError(ClassLoader.getSystemClassLoader())
-                        .getMessage(),
-                is(expectedFailure.getMessage()));
+                entry1,
+                is(
+                        ExceptionHistoryEntryMatcher.matchesFailure(
+                                exception1,
+                                updateStateTriggeringRestartTimestamp1,
+                                executionVertex1.getTaskNameWithSubtaskIndex(),
+                                
executionVertex1.getCurrentAssignedResourceLocation())));
         assertThat(entry1.getConcurrentExceptions(), 
IsEmptyIterable.emptyIterable());
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
new file mode 100644
index 0000000..fa2225b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.scheduler.exceptionhistory.ArchivedTaskManagerLocationMatcher.isArchivedTaskManagerLocation;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** {@code ExceptionHistoryEntryTest} tests the creation of {@link 
ExceptionHistoryEntry}. */
+public class ExceptionHistoryEntryTest extends TestLogger {
+
+    @Test
+    public void testCreate() {
+        final Throwable failure = new RuntimeException("Expected exception");
+        final long timestamp = System.currentTimeMillis();
+        final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+        final AccessExecution execution =
+                new TestAccessExecution(
+                        new ExecutionAttemptID(), failure, timestamp, 
taskManagerLocation);
+        final String taskName = "task name";
+
+        final ExceptionHistoryEntry entry = 
ExceptionHistoryEntry.create(execution, taskName);
+
+        assertThat(
+                
entry.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                is(failure));
+        assertThat(entry.getTimestamp(), is(timestamp));
+        assertThat(entry.getFailingTaskName(), is(taskName));
+        assertThat(
+                entry.getTaskManagerLocation(), 
isArchivedTaskManagerLocation(taskManagerLocation));
+        assertThat(entry.isGlobal(), is(false));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCreationFailure() {
+        ExceptionHistoryEntry.create(
+                TestAccessExecution.createExecutionWithoutFailure(
+                        new ExecutionAttemptID(), new 
LocalTaskManagerLocation()),
+                "task name");
+    }
+
+    private static class TestAccessExecution implements AccessExecution {
+
+        private final ExecutionAttemptID executionAttemptID;
+        private final ErrorInfo failureInfo;
+        private final TaskManagerLocation taskManagerLocation;
+
+        private static TestAccessExecution createExecutionWithoutFailure(
+                ExecutionAttemptID executionAttemptID, TaskManagerLocation 
taskManagerLocation) {
+            return new TestAccessExecution(executionAttemptID, null, 0L, 
taskManagerLocation);
+        }
+
+        private TestAccessExecution(
+                ExecutionAttemptID executionAttemptID,
+                @Nullable Throwable failure,
+                long timestamp,
+                TaskManagerLocation taskManagerLocation) {
+            this.executionAttemptID = executionAttemptID;
+            this.failureInfo = failure == null ? null : new ErrorInfo(failure, 
timestamp);
+            this.taskManagerLocation = taskManagerLocation;
+        }
+
+        @Override
+        public ExecutionAttemptID getAttemptId() {
+            return executionAttemptID;
+        }
+
+        @Override
+        public TaskManagerLocation getAssignedResourceLocation() {
+            return taskManagerLocation;
+        }
+
+        @Override
+        public Optional<ErrorInfo> getFailureInfo() {
+            return Optional.ofNullable(failureInfo);
+        }
+
+        // -- unsupported methods
+
+        @Override
+        public int getAttemptNumber() {
+            throw new UnsupportedOperationException("getAttemptNumber should 
not be called.");
+        }
+
+        @Override
+        public long[] getStateTimestamps() {
+            throw new UnsupportedOperationException("getStateTimestamps should 
not be called.");
+        }
+
+        @Override
+        public ExecutionState getState() {
+            throw new UnsupportedOperationException("getState should not be 
called.");
+        }
+
+        @Override
+        public long getStateTimestamp(ExecutionState state) {
+            throw new UnsupportedOperationException("getStateTimestamp should 
not be called.");
+        }
+
+        @Override
+        public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() 
{
+            throw new UnsupportedOperationException(
+                    "getUserAccumulatorsStringified should not be called.");
+        }
+
+        @Override
+        public int getParallelSubtaskIndex() {
+            throw new UnsupportedOperationException(
+                    "getParallelSubtaskIndex should not be called.");
+        }
+
+        @Override
+        public IOMetrics getIOMetrics() {
+            throw new UnsupportedOperationException("getIOMetrics should not 
be called.");
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
new file mode 100644
index 0000000..e541164
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.hamcrest.core.IsInstanceOf;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * {@code FailureHandlingResultSnapshotTest} tests the creation of {@link
+ * FailureHandlingResultSnapshot}.
+ */
+public class FailureHandlingResultSnapshotTest extends TestLogger {
+
+    private ExecutionGraph executionGraph;
+
+    @Before
+    public void setup() throws JobException, JobExecutionException {
+        final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
+        jobGraph.getVertices().forEach(v -> v.setParallelism(3));
+
+        executionGraph =
+                
TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
+        
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testRootCauseVertexNotFailed() {
+        final ExecutionVertex rootCauseExecutionVertex = 
extractExecutionVertex(0);
+        final FailureHandlingResult failureHandlingResult =
+                FailureHandlingResult.restartable(
+                        rootCauseExecutionVertex.getID(),
+                        new RuntimeException("Expected exception: root cause"),
+                        System.currentTimeMillis(),
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                .map(ExecutionVertex::getID)
+                                .collect(Collectors.toSet()),
+                        0L,
+                        false);
+
+        FailureHandlingResultSnapshot.create(failureHandlingResult, 
this::getLatestExecution);
+    }
+
+    @Test // see FLINK-22060/FLINK-21376
+    public void testMissingThrowableHandling() {
+        final ExecutionVertex rootCauseExecutionVertex = 
extractExecutionVertex(0);
+
+        final long rootCauseTimestamp = 
triggerFailure(rootCauseExecutionVertex, null);
+
+        final FailureHandlingResult failureHandlingResult =
+                FailureHandlingResult.restartable(
+                        rootCauseExecutionVertex.getID(),
+                        null,
+                        rootCauseTimestamp,
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                .map(ExecutionVertex::getID)
+                                .collect(Collectors.toSet()),
+                        0L,
+                        false);
+
+        final FailureHandlingResultSnapshot testInstance =
+                FailureHandlingResultSnapshot.create(
+                        failureHandlingResult, this::getLatestExecution);
+
+        final Throwable actualException =
+                new SerializedThrowable(testInstance.getRootCause())
+                        .deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, 
IsInstanceOf.instanceOf(FlinkException.class));
+        assertThat(
+                actualException,
+                
FlinkMatchers.containsMessage(ErrorInfo.handleMissingThrowable(null).getMessage()));
+        assertThat(testInstance.getTimestamp(), is(rootCauseTimestamp));
+        assertThat(testInstance.getRootCauseExecution().isPresent(), is(true));
+        assertThat(
+                testInstance.getRootCauseExecution().get(),
+                is(rootCauseExecutionVertex.getCurrentExecutionAttempt()));
+    }
+
+    @Test
+    public void testLocalFailureHandlingResultSnapshotCreation() {
+        final ExecutionVertex rootCauseExecutionVertex = 
extractExecutionVertex(0);
+        final Throwable rootCause = new RuntimeException("Expected exception: 
root cause");
+        final ExecutionVertex otherFailedExecutionVertex = 
extractExecutionVertex(1);
+        final Throwable otherFailure =
+                new IllegalStateException("Expected exception: other failure");
+
+        final long rootCauseTimestamp = 
triggerFailure(rootCauseExecutionVertex, rootCause);
+        triggerFailure(otherFailedExecutionVertex, otherFailure);
+
+        final FailureHandlingResult failureHandlingResult =
+                FailureHandlingResult.restartable(
+                        rootCauseExecutionVertex.getID(),
+                        rootCause,
+                        rootCauseTimestamp,
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                .map(ExecutionVertex::getID)
+                                .collect(Collectors.toSet()),
+                        0L,
+                        false);
+
+        final FailureHandlingResultSnapshot testInstance =
+                FailureHandlingResultSnapshot.create(
+                        failureHandlingResult, this::getLatestExecution);
+
+        assertThat(testInstance.getRootCause(), is(rootCause));
+        assertThat(testInstance.getTimestamp(), is(rootCauseTimestamp));
+        assertThat(testInstance.getRootCauseExecution().isPresent(), is(true));
+        assertThat(
+                testInstance.getRootCauseExecution().get(),
+                is(rootCauseExecutionVertex.getCurrentExecutionAttempt()));
+
+        assertThat(
+                testInstance.getConcurrentlyFailedExecution(),
+                IsIterableContainingInOrder.contains(
+                        
otherFailedExecutionVertex.getCurrentExecutionAttempt()));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void 
testFailureHandlingWithRootCauseExecutionBeingPartOfConcurrentlyFailedExecutions()
 {
+        final Execution rootCauseExecution = 
extractExecutionVertex(0).getCurrentExecutionAttempt();
+        new FailureHandlingResultSnapshot(
+                rootCauseExecution,
+                new RuntimeException("Expected exception"),
+                System.currentTimeMillis(),
+                Collections.singleton(rootCauseExecution));
+    }
+
+    @Test
+    public void testGlobalFailureHandlingResultSnapshotCreation() {
+        final Throwable rootCause = new FlinkException("Expected exception: 
root cause");
+        final long timestamp = System.currentTimeMillis();
+
+        final ExecutionVertex failedExecutionVertex0 = 
extractExecutionVertex(0);
+        final Throwable failure0 = new RuntimeException("Expected exception: 
failure #0");
+        final ExecutionVertex failedExecutionVertex1 = 
extractExecutionVertex(1);
+        final Throwable failure1 = new IllegalStateException("Expected 
exception: failure #1");
+
+        triggerFailure(failedExecutionVertex0, failure0);
+        triggerFailure(failedExecutionVertex1, failure1);
+
+        final FailureHandlingResult failureHandlingResult =
+                FailureHandlingResult.restartable(
+                        null,
+                        rootCause,
+                        timestamp,
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                .map(ExecutionVertex::getID)
+                                .collect(Collectors.toSet()),
+                        0L,
+                        true);
+
+        final FailureHandlingResultSnapshot testInstance =
+                FailureHandlingResultSnapshot.create(
+                        failureHandlingResult, this::getLatestExecution);
+
+        assertThat(testInstance.getRootCause(), is(rootCause));
+        assertThat(testInstance.getTimestamp(), is(timestamp));
+        assertThat(testInstance.getRootCauseExecution().isPresent(), 
is(false));
+
+        assertThat(
+                testInstance.getConcurrentlyFailedExecution(),
+                IsIterableContainingInAnyOrder.containsInAnyOrder(
+                        failedExecutionVertex0.getCurrentExecutionAttempt(),
+                        failedExecutionVertex1.getCurrentExecutionAttempt()));
+    }
+
+    private Execution getLatestExecution(ExecutionVertexID executionVertexId) {
+        if 
(!executionGraph.getAllVertices().containsKey(executionVertexId.getJobVertexId()))
 {
+            throw new IllegalArgumentException(
+                    "The ExecutionJobVertex having the ID "
+                            + executionVertexId.getJobVertexId()
+                            + " does not exist.");
+        }
+
+        final ExecutionVertex[] executions =
+                executionGraph
+                        .getAllVertices()
+                        .get(executionVertexId.getJobVertexId())
+                        .getTaskVertices();
+
+        if (executions.length <= executionVertexId.getSubtaskIndex()) {
+            throw new IllegalArgumentException(
+                    "The ExecutionVertex having the subtask ID "
+                            + executionVertexId.getSubtaskIndex()
+                            + " for ExecutionJobVertex "
+                            + executionVertexId.getJobVertexId()
+                            + " does not exist.");
+        }
+
+        return 
executions[executionVertexId.getSubtaskIndex()].getCurrentExecutionAttempt();
+    }
+
+    private long triggerFailure(ExecutionVertex executionVertex, Throwable 
throwable) {
+        executionGraph.updateState(
+                new TaskExecutionStateTransition(
+                        new TaskExecutionState(
+                                
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+                                ExecutionState.FAILED,
+                                throwable)));
+
+        return executionVertex
+                .getFailureInfo()
+                .orElseThrow(
+                        () ->
+                                new IllegalArgumentException(
+                                        "The transition into failed state 
didn't succeed for ExecutionVertex "
+                                                + executionVertex.getID()
+                                                + "."))
+                .getTimestamp();
+    }
+
+    private ExecutionVertex extractExecutionVertex(int pos) {
+        final ExecutionVertex executionVertex =
+                Iterables.get(executionGraph.getAllExecutionVertices(), pos);
+        executionVertex.tryAssignResource(
+                new TestingLogicalSlotBuilder().createTestingLogicalSlot());
+
+        return executionVertex;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
similarity index 65%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
index 844d9b9..10821cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
@@ -28,29 +28,27 @@ import 
org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.junit.Assert.assertThat;
 
-/** {@code ExceptionHistoryEntryExtractorTest} tests {@link 
ExceptionHistoryEntryExtractor}. */
-public class ExceptionHistoryEntryExtractorTest extends TestLogger {
-
-    private final ExceptionHistoryEntryExtractor testInstance =
-            new ExceptionHistoryEntryExtractor();
+/**
+ * {@code RootExceptionHistoryEntryTest} tests the creation of {@link 
RootExceptionHistoryEntry}.
+ */
+public class RootExceptionHistoryEntryTest extends TestLogger {
 
     private ExecutionGraph executionGraph;
 
@@ -64,65 +62,8 @@ public class ExceptionHistoryEntryExtractorTest extends 
TestLogger {
         
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testWrongExecutionVertexIdOfFailingVertex() {
-        testInstance.extractLocalFailure(
-                executionGraph.getAllVertices(),
-                new ExecutionVertexID(new JobVertexID(), 0),
-                Collections.emptyList());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testWrongSubtaskIndexOfFailingVertex() {
-        final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0);
-        triggerFailure(rootExecutionVertex, new RuntimeException("Expected 
root cause"));
-        testInstance.extractLocalFailure(
-                executionGraph.getAllVertices(),
-                new ExecutionVertexID(
-                        rootExecutionVertex.getID().getJobVertexId(), 
Integer.MAX_VALUE),
-                Collections.emptyList());
-    }
-
-    @Ignore // disabled due to FLINK-22276 workaround in 
ExceptionHistoryEntryExtractor
-    @Test(expected = IllegalArgumentException.class)
-    public void testRootExecutionVertexIdNotFailed() {
-        final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0);
-        testInstance.extractLocalFailure(
-                executionGraph.getAllVertices(),
-                rootExecutionVertex.getID(),
-                Collections.emptyList());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testWrongExecutionVertexIdOfConcurrentlyFailedVertex() {
-        final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0);
-        triggerFailure(rootExecutionVertex, new RuntimeException("Expected 
root cause"));
-        testInstance.extractLocalFailure(
-                executionGraph.getAllVertices(),
-                rootExecutionVertex.getID(),
-                Collections.singleton(new ExecutionVertexID(new JobVertexID(), 
0)));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testWrongSubtaskIndexOfConcurrentlyFailedVertex() {
-        final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0);
-        triggerFailure(rootExecutionVertex, new RuntimeException("Expected 
root cause"));
-        final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
-        triggerFailure(
-                concurrentlyFailedExecutionVertex,
-                new RuntimeException("Expected concurrent failure"));
-
-        testInstance.extractLocalFailure(
-                executionGraph.getAllVertices(),
-                rootExecutionVertex.getID(),
-                Collections.singleton(
-                        new ExecutionVertexID(
-                                
concurrentlyFailedExecutionVertex.getJobvertexId(),
-                                Integer.MAX_VALUE)));
-    }
-
     @Test
-    public void extractLocalFailure() {
+    public void testFromFailureHandlingResultSnapshot() {
         final Throwable rootException = new RuntimeException("Expected root 
failure");
         final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0);
         final long rootTimestamp = triggerFailure(rootExecutionVertex, 
rootException);
@@ -132,15 +73,15 @@ public class ExceptionHistoryEntryExtractorTest extends 
TestLogger {
         final long concurrentExceptionTimestamp =
                 triggerFailure(concurrentlyFailedExecutionVertex, 
concurrentException);
 
-        final ExecutionVertex notFailedExecutionVertex = 
extractExecutionVertex(2);
-
+        final FailureHandlingResultSnapshot snapshot =
+                new FailureHandlingResultSnapshot(
+                        rootExecutionVertex.getCurrentExecutionAttempt(),
+                        rootException,
+                        rootTimestamp,
+                        Collections.singleton(
+                                
concurrentlyFailedExecutionVertex.getCurrentExecutionAttempt()));
         final RootExceptionHistoryEntry actualEntry =
-                testInstance.extractLocalFailure(
-                        executionGraph.getAllVertices(),
-                        rootExecutionVertex.getID(),
-                        Arrays.asList(
-                                concurrentlyFailedExecutionVertex.getID(),
-                                notFailedExecutionVertex.getID()));
+                
RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(snapshot);
 
         assertThat(
                 actualEntry,
@@ -161,7 +102,7 @@ public class ExceptionHistoryEntryExtractorTest extends 
TestLogger {
     }
 
     @Test
-    public void extractGlobalFailure() {
+    public void testFromGlobalFailure() {
         final Throwable concurrentException0 =
                 new RuntimeException("Expected concurrent failure #0");
         final ExecutionVertex concurrentlyFailedExecutionVertex0 = 
extractExecutionVertex(0);
@@ -177,15 +118,21 @@ public class ExceptionHistoryEntryExtractorTest extends 
TestLogger {
         final Throwable rootCause = new Exception("Expected root failure");
         final long rootTimestamp = System.currentTimeMillis();
         final RootExceptionHistoryEntry actualEntry =
-                testInstance.extractGlobalFailure(
-                        executionGraph.getAllExecutionVertices(), rootCause, 
rootTimestamp);
+                RootExceptionHistoryEntry.fromGlobalFailure(
+                        rootCause,
+                        rootTimestamp,
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .collect(Collectors.toSet()));
 
         assertThat(
                 actualEntry,
                 ExceptionHistoryEntryMatcher.matchesGlobalFailure(rootCause, 
rootTimestamp));
         assertThat(
                 actualEntry.getConcurrentExceptions(),
-                IsIterableContainingInOrder.contains(
+                IsIterableContainingInAnyOrder.containsInAnyOrder(
                         ExceptionHistoryEntryMatcher.matchesFailure(
                                 concurrentException0,
                                 concurrentExceptionTimestamp0,

Reply via email to