tillrohrmann commented on a change in pull request #15640:
URL: https://github.com/apache/flink/pull/15640#discussion_r615705598



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -39,65 +40,77 @@
  */
 public class FailureHandlingResultSnapshot {
 
-    private final ExecutionAttemptID rootCauseExecution;
+    private final Execution rootCauseExecution;
     private final Throwable rootCause;
     private final long timestamp;
-    private final Map<ExecutionAttemptID, Execution> failedExecutions;
+    private final Set<Execution> concurrentlyFailedExecutions;
 
     /**
      * Creates a {@code FailureHandlingResultSnapshot} based on the passed 
{@link
      * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
      *
      * @param failureHandlingResult The {@code FailureHandlingResult} that is 
used for extracting
      *     the failure information.
-     * @param executionVertices The {@code ExecutionVertices} that shall be 
scanned for failures
-     *     based on the passed {@code FailureHandlingResult}.
+     * @param executionVertexLookup The look-up function for retrieving the 
actual {@link
+     *     ExecutionVertex} instances for a given {@link ExecutionAttemptID}.
      * @return The {@code FailureHandlingResultSnapshot}.
      */
     public static FailureHandlingResultSnapshot create(
             FailureHandlingResult failureHandlingResult,
-            Iterable<ExecutionVertex> executionVertices) {
-        final ExecutionVertexID rootCauseExecutionVertexId =
-                
failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null);
-
-        ExecutionAttemptID rootCauseExecutionAttemptId = null;
-        final Map<ExecutionAttemptID, Execution> failedExecutions = new 
HashMap<>();
-        for (ExecutionVertex executionVertex : executionVertices) {
-            if (executionVertex.getID().equals(rootCauseExecutionVertexId)) {
-                Preconditions.checkArgument(
-                        
executionVertex.getCurrentExecutionAttempt().getFailureInfo().isPresent(),
-                        String.format(
-                                "The execution %s didn't provide a failure 
info even though the corresponding ExecutionVertex %s is marked as having 
handled the root cause of this failure.",
-                                
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
-                                executionVertex.getID()));
-                rootCauseExecutionAttemptId =
-                        
executionVertex.getCurrentExecutionAttempt().getAttemptId();
-            }
-
-            final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
-            if 
(failureHandlingResult.getVerticesToRestart().contains(executionVertex.getID())
-                    && execution.getFailureInfo().isPresent()) {
-                failedExecutions.put(execution.getAttemptId(), execution);
-            }
-        }
+            Function<ExecutionVertexID, ExecutionVertex> 
executionVertexLookup) {

Review comment:
       An observation: If we change this to `Function<ExecutionVertexID, 
Execution>` then we don't have to expose the `ExecutionVertex` to this method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
##########
@@ -39,7 +42,40 @@
     @Nullable private final String failingTaskName;
     @Nullable private final ArchivedTaskManagerLocation taskManagerLocation;
 
-    ExceptionHistoryEntry(
+    /**
+     * Creates an {@code ExceptionHistoryEntry} based on the provided {@code 
Execution}.
+     *
+     * @param failedExecution the failed {@code Execution}.
+     * @param taskName the name of the task.
+     * @return The {@code ExceptionHistoryEntry}.
+     * @throws IllegalArgumentException if the passed {@code Execution} does 
not provide a {@link
+     *     Execution#getFailureInfo() failureInfo}.
+     */
+    public static ExceptionHistoryEntry create(AccessExecution 
failedExecution, String taskName) {
+        Preconditions.checkArgument(
+                failedExecution.getFailureInfo().isPresent(),
+                "The selected Execution " + failedExecution.getAttemptId() + " 
didn't fail.");
+
+        final ErrorInfo failure = failedExecution.getFailureInfo().get();
+        return new ExceptionHistoryEntry(
+                failure.getException(),
+                failure.getTimestamp(),
+                taskName,
+                failedExecution.getAssignedResourceLocation());
+    }
+
+    /**
+     * Instantiates a {@code ExceptionHistoryEntry}.
+     *
+     * @param cause The reason for the failure.
+     * @param timestamp The time the failure was caught.
+     * @param failingTaskName The name of the task that failed.
+     * @param taskManagerLocation The host the task was running on.
+     * @throws NullPointerException if {@code failure} is {@code null}.

Review comment:
       failure => cause.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link 
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+    @Nullable private final Execution rootCauseExecution;
+    private final Throwable rootCause;
+    private final long timestamp;
+    private final Set<Execution> concurrentlyFailedExecutions;
+
+    /**
+     * Creates a {@code FailureHandlingResultSnapshot} based on the passed 
{@link
+     * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
+     *
+     * @param failureHandlingResult The {@code FailureHandlingResult} that is 
used for extracting
+     *     the failure information.
+     * @param executionVertexLookup The look-up function for retrieving the 
actual {@link
+     *     ExecutionVertex} instances for a given {@link ExecutionAttemptID}.
+     * @return The {@code FailureHandlingResultSnapshot}.
+     */
+    public static FailureHandlingResultSnapshot create(
+            FailureHandlingResult failureHandlingResult,
+            Function<ExecutionVertexID, ExecutionVertex> 
executionVertexLookup) {
+        final Execution rootCauseExecution =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(executionVertexLookup)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .orElse(null);
+        Preconditions.checkArgument(
+                rootCauseExecution == null || 
rootCauseExecution.getFailureInfo().isPresent(),
+                String.format(
+                        "The execution %s didn't provide a failure info even 
though the corresponding ExecutionVertex %s is marked as having handled the 
root cause of this failure.",
+                        // the "(null)" values should never be used due to the 
condition - it's just
+                        // added to make the compiler happy
+                        rootCauseExecution != null ? 
rootCauseExecution.getAttemptId() : "(null)",
+                        failureHandlingResult
+                                .getExecutionVertexIdOfFailedTask()
+                                .map(Objects::toString)
+                                .orElse("(null)")));
+
+        final Set<Execution> concurrentlyFailedExecutions =
+                failureHandlingResult.getVerticesToRestart().stream()
+                        .filter(
+                                e ->
+                                        !e.equals(
+                                                failureHandlingResult
+                                                        
.getExecutionVertexIdOfFailedTask()
+                                                        .orElse(null)))
+                        .map(executionVertexLookup)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .filter(e -> e.getFailureInfo().isPresent())
+                        .collect(Collectors.toSet());
+
+        return new FailureHandlingResultSnapshot(
+                rootCauseExecution,
+                
ErrorInfo.handleMissingThrowable(failureHandlingResult.getError()),
+                failureHandlingResult.getTimestamp(),
+                concurrentlyFailedExecutions);
+    }
+
+    @VisibleForTesting
+    FailureHandlingResultSnapshot(
+            @Nullable Execution rootCauseExecution,
+            Throwable rootCause,
+            long timestamp,
+            Set<Execution> concurrentlyFailedExecutions) {
+        Preconditions.checkArgument(
+                rootCauseExecution == null
+                        || 
!concurrentlyFailedExecutions.contains(rootCauseExecution),
+                "The rootCauseExecution should not be part of the 
concurrentlyFailedExecutions map.");
+
+        this.rootCauseExecution = rootCauseExecution;
+        this.rootCause = Preconditions.checkNotNull(rootCause);
+        this.timestamp = timestamp;
+        this.concurrentlyFailedExecutions =
+                Preconditions.checkNotNull(concurrentlyFailedExecutions);
+    }
+
+    /**
+     * Returns the {@link Execution} that handled the root cause for this 
failure. An empty {@code
+     * Optional} will be returned if it's a global failure.
+     *
+     * @return The {@link Execution} that handled the root cause for this 
failure.
+     */
+    public Optional<Execution> getRootCauseExecution() {
+        return Optional.ofNullable(rootCauseExecution);
+    }
+
+    /**
+     * The actual failure that is handled.
+     *
+     * @return The {@code Throwable}.
+     */
+    public Throwable getRootCause() {
+        return rootCause;
+    }
+
+    /**
+     * The time the failure occurred.
+     *
+     * @return The time of the failure.
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    /**
+     * All {@link Execution Executions} that failed and are planned to be 
restarted as part of this
+     * failure handling.
+     *
+     * @return The concurrently failed {@code Executions}.
+     */
+    public Iterable<Execution> getConcurrentlyFailedExecution() {
+        return concurrentlyFailedExecutions;

Review comment:
       I would suggest to return 
`Collections.unmodifiableSet(concurrentlyFailedExecutions)`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -39,65 +40,77 @@
  */
 public class FailureHandlingResultSnapshot {
 
-    private final ExecutionAttemptID rootCauseExecution;
+    private final Execution rootCauseExecution;
     private final Throwable rootCause;
     private final long timestamp;
-    private final Map<ExecutionAttemptID, Execution> failedExecutions;
+    private final Set<Execution> concurrentlyFailedExecutions;
 
     /**
      * Creates a {@code FailureHandlingResultSnapshot} based on the passed 
{@link
      * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
      *
      * @param failureHandlingResult The {@code FailureHandlingResult} that is 
used for extracting
      *     the failure information.
-     * @param executionVertices The {@code ExecutionVertices} that shall be 
scanned for failures
-     *     based on the passed {@code FailureHandlingResult}.
+     * @param executionVertexLookup The look-up function for retrieving the 
actual {@link
+     *     ExecutionVertex} instances for a given {@link ExecutionAttemptID}.
      * @return The {@code FailureHandlingResultSnapshot}.
      */
     public static FailureHandlingResultSnapshot create(
             FailureHandlingResult failureHandlingResult,
-            Iterable<ExecutionVertex> executionVertices) {
-        final ExecutionVertexID rootCauseExecutionVertexId =
-                
failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null);
-
-        ExecutionAttemptID rootCauseExecutionAttemptId = null;
-        final Map<ExecutionAttemptID, Execution> failedExecutions = new 
HashMap<>();
-        for (ExecutionVertex executionVertex : executionVertices) {
-            if (executionVertex.getID().equals(rootCauseExecutionVertexId)) {
-                Preconditions.checkArgument(
-                        
executionVertex.getCurrentExecutionAttempt().getFailureInfo().isPresent(),
-                        String.format(
-                                "The execution %s didn't provide a failure 
info even though the corresponding ExecutionVertex %s is marked as having 
handled the root cause of this failure.",
-                                
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
-                                executionVertex.getID()));
-                rootCauseExecutionAttemptId =
-                        
executionVertex.getCurrentExecutionAttempt().getAttemptId();
-            }
-
-            final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
-            if 
(failureHandlingResult.getVerticesToRestart().contains(executionVertex.getID())
-                    && execution.getFailureInfo().isPresent()) {
-                failedExecutions.put(execution.getAttemptId(), execution);
-            }
-        }
+            Function<ExecutionVertexID, ExecutionVertex> 
executionVertexLookup) {

Review comment:
       We could then name it `latestExecutionLookup` or so.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
##########
@@ -115,7 +156,12 @@ public void 
testLocalFailureHandlingResultSnapshotCreation() {
                 FailureHandlingResultSnapshot.create(
                         failureHandlingResult, this::getExecutionVertex);
 
-        assertThat(testInstance.getRootCause(), is(rootCause));
+        // SerializableThrowable needs to be wrapped around to expose the 
deserializeError method -
+        // the check would fail, otherwise.
+        assertThat(
+                new SerializedThrowable(testInstance.getRootCause())
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(rootCause));

Review comment:
       The test also passes w/o this wrapping.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -39,65 +40,77 @@
  */
 public class FailureHandlingResultSnapshot {
 
-    private final ExecutionAttemptID rootCauseExecution;
+    private final Execution rootCauseExecution;
     private final Throwable rootCause;
     private final long timestamp;
-    private final Map<ExecutionAttemptID, Execution> failedExecutions;
+    private final Set<Execution> concurrentlyFailedExecutions;
 
     /**
      * Creates a {@code FailureHandlingResultSnapshot} based on the passed 
{@link
      * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
      *
      * @param failureHandlingResult The {@code FailureHandlingResult} that is 
used for extracting
      *     the failure information.
-     * @param executionVertices The {@code ExecutionVertices} that shall be 
scanned for failures
-     *     based on the passed {@code FailureHandlingResult}.
+     * @param executionVertexLookup The look-up function for retrieving the 
actual {@link
+     *     ExecutionVertex} instances for a given {@link ExecutionAttemptID}.
      * @return The {@code FailureHandlingResultSnapshot}.
      */
     public static FailureHandlingResultSnapshot create(
             FailureHandlingResult failureHandlingResult,
-            Iterable<ExecutionVertex> executionVertices) {
-        final ExecutionVertexID rootCauseExecutionVertexId =
-                
failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null);
-
-        ExecutionAttemptID rootCauseExecutionAttemptId = null;
-        final Map<ExecutionAttemptID, Execution> failedExecutions = new 
HashMap<>();
-        for (ExecutionVertex executionVertex : executionVertices) {
-            if (executionVertex.getID().equals(rootCauseExecutionVertexId)) {
-                Preconditions.checkArgument(
-                        
executionVertex.getCurrentExecutionAttempt().getFailureInfo().isPresent(),
-                        String.format(
-                                "The execution %s didn't provide a failure 
info even though the corresponding ExecutionVertex %s is marked as having 
handled the root cause of this failure.",
-                                
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
-                                executionVertex.getID()));
-                rootCauseExecutionAttemptId =
-                        
executionVertex.getCurrentExecutionAttempt().getAttemptId();
-            }
-
-            final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
-            if 
(failureHandlingResult.getVerticesToRestart().contains(executionVertex.getID())
-                    && execution.getFailureInfo().isPresent()) {
-                failedExecutions.put(execution.getAttemptId(), execution);
-            }
-        }
+            Function<ExecutionVertexID, ExecutionVertex> 
executionVertexLookup) {
+        final Execution rootCauseExecution =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(executionVertexLookup)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .orElse(null);
+        Preconditions.checkArgument(
+                rootCauseExecution == null || 
rootCauseExecution.getFailureInfo().isPresent(),
+                String.format(
+                        "The execution %s didn't provide a failure info even 
though the corresponding ExecutionVertex %s is marked as having handled the 
root cause of this failure.",
+                        // the "(null)" values should never be used due to the 
condition - it's just
+                        // added to make the compiler happy
+                        rootCauseExecution != null ? 
rootCauseExecution.getAttemptId() : "(null)",
+                        failureHandlingResult
+                                .getExecutionVertexIdOfFailedTask()
+                                .map(Objects::toString)
+                                .orElse("(null)")));
+
+        final Set<Execution> concurrentlyFailedExecutions =
+                failureHandlingResult.getVerticesToRestart().stream()
+                        .filter(
+                                e ->
+                                        !e.equals(
+                                                failureHandlingResult
+                                                        
.getExecutionVertexIdOfFailedTask()
+                                                        .orElse(null)))

Review comment:
       Maybe give this expression a name like `rootCauseExecutionVertex` or so.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
##########
@@ -126,6 +128,16 @@ public void 
testLocalFailureHandlingResultSnapshotCreation() {
                         
otherFailedExecutionVertex.getCurrentExecutionAttempt()));
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testRootCauseExecutionNotPartOfMap() {

Review comment:
       Name of the test is misleading. Maybe 
`testFailureHandlingResultSnapshotFailsIfRootCauseExecutionIsPartOfConcurrentFailures()`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link 
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+    @Nullable private final Execution rootCauseExecution;
+    private final Throwable rootCause;
+    private final long timestamp;
+    private final Set<Execution> concurrentlyFailedExecutions;
+
+    /**
+     * Creates a {@code FailureHandlingResultSnapshot} based on the passed 
{@link
+     * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
+     *
+     * @param failureHandlingResult The {@code FailureHandlingResult} that is 
used for extracting
+     *     the failure information.
+     * @param executionVertexLookup The look-up function for retrieving the 
actual {@link
+     *     ExecutionVertex} instances for a given {@link ExecutionAttemptID}.
+     * @return The {@code FailureHandlingResultSnapshot}.
+     */
+    public static FailureHandlingResultSnapshot create(
+            FailureHandlingResult failureHandlingResult,
+            Function<ExecutionVertexID, ExecutionVertex> 
executionVertexLookup) {
+        final Execution rootCauseExecution =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(executionVertexLookup)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .orElse(null);
+        Preconditions.checkArgument(
+                rootCauseExecution == null || 
rootCauseExecution.getFailureInfo().isPresent(),
+                String.format(
+                        "The execution %s didn't provide a failure info even 
though the corresponding ExecutionVertex %s is marked as having handled the 
root cause of this failure.",
+                        // the "(null)" values should never be used due to the 
condition - it's just
+                        // added to make the compiler happy
+                        rootCauseExecution != null ? 
rootCauseExecution.getAttemptId() : "(null)",
+                        failureHandlingResult
+                                .getExecutionVertexIdOfFailedTask()
+                                .map(Objects::toString)
+                                .orElse("(null)")));
+
+        final Set<Execution> concurrentlyFailedExecutions =
+                failureHandlingResult.getVerticesToRestart().stream()
+                        .filter(
+                                e ->
+                                        !e.equals(
+                                                failureHandlingResult
+                                                        
.getExecutionVertexIdOfFailedTask()
+                                                        .orElse(null)))
+                        .map(executionVertexLookup)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .filter(e -> e.getFailureInfo().isPresent())
+                        .collect(Collectors.toSet());
+
+        return new FailureHandlingResultSnapshot(
+                rootCauseExecution,
+                
ErrorInfo.handleMissingThrowable(failureHandlingResult.getError()),

Review comment:
       Maybe add `@Nullable` annotation to `getError()` method as a hotfix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to