tillrohrmann commented on a change in pull request #15640:
URL: https://github.com/apache/flink/pull/15640#discussion_r614687352
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -1126,6 +1133,94 @@ public void testExceptionHistoryWithRestartableFailure()
{
failingException,
updateStateTriggeringJobFailureTimestamp)));
}
+ @Ignore
+ @Test
+ public void testExceptionHistoryConcurrentRestart() throws Exception {
+ final JobGraph jobGraph = singleJobVertexJobGraph(2);
+
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+ final TestingLogicalSlotBuilder logicalSlotBuilder = new
TestingLogicalSlotBuilder();
+ logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation);
+
+ executionSlotAllocatorFactory = new
TestExecutionSlotAllocatorFactory(logicalSlotBuilder);
+
+ final ReorganizableManuallyTriggeredScheduledExecutor delayExecutor =
+ new ReorganizableManuallyTriggeredScheduledExecutor();
+ final TestFailoverStrategyFactory failoverStrategyFactory =
+ new TestFailoverStrategyFactory();
+ final DefaultScheduler scheduler =
+ createScheduler(
+ jobGraph,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ new PipelinedRegionSchedulingStrategy.Factory(),
+ failoverStrategyFactory,
+ delayExecutor);
+ scheduler.startScheduling();
+
+ final ExecutionVertex executionVertex0 =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0);
+ final ExecutionVertex executionVertex1 =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 1);
+
+ // single-ExecutionVertex failure
+ final RuntimeException exception0 = new RuntimeException("failure #0");
+ failoverStrategyFactory.setTasksToRestart(executionVertex0.getID());
+ final long updateStateTriggeringRestartTimestamp0 =
+ initiateFailure(
+ scheduler,
+
executionVertex0.getCurrentExecutionAttempt().getAttemptId(),
+ exception0);
+
+ // multi-ExecutionVertex failure
+ final RuntimeException exception1 = new RuntimeException("failure #1");
+ failoverStrategyFactory.setTasksToRestart(
+ executionVertex1.getID(), executionVertex0.getID());
+ final long updateStateTriggeringRestartTimestamp1 =
+ initiateFailure(
+ scheduler,
+
executionVertex1.getCurrentExecutionAttempt().getAttemptId(),
+ exception1);
+
+ // there might be a race condition with the delayExecutor if the tasks
are scheduled quite
+ // close to each other which we want to simulate here
+ Collections.reverse(delayExecutor.getCollectedScheduledTasks());
+
+ delayExecutor.triggerNonPeriodicScheduledTasks();
Review comment:
We don't have to change it right now but the test relies quite a bit on
internal implementation details. To give you an example: If we decide to change
how restarting tasks is internally handled, then we will break this test.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+ private final ExecutionAttemptID rootCauseExecution;
+ private final Throwable rootCause;
+ private final long timestamp;
+ private final Map<ExecutionAttemptID, Execution> failedExecutions;
+
+ /**
+ * Creates a {@code FailureHandlingResultSnapshot} based on the passed
{@link
+ * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
+ *
+ * @param failureHandlingResult The {@code FailureHandlingResult} that is
used for extracting
+ * the failure information.
+ * @param executionVertices The {@code ExecutionVertices} that shall be
scanned for failures
+ * based on the passed {@code FailureHandlingResult}.
+ * @return The {@code FailureHandlingResultSnapshot}.
+ */
+ public static FailureHandlingResultSnapshot create(
+ FailureHandlingResult failureHandlingResult,
+ Iterable<ExecutionVertex> executionVertices) {
+ final ExecutionVertexID rootCauseExecutionVertexId =
+
failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null);
+
+ ExecutionAttemptID rootCauseExecutionAttemptId = null;
+ final Map<ExecutionAttemptID, Execution> failedExecutions = new
HashMap<>();
+ for (ExecutionVertex executionVertex : executionVertices) {
+ if (executionVertex.getID().equals(rootCauseExecutionVertexId)) {
+ Preconditions.checkArgument(
+
executionVertex.getCurrentExecutionAttempt().getFailureInfo().isPresent(),
+ String.format(
+ "The execution %s didn't provide a failure
info even though the corresponding ExecutionVertex %s is marked as having
handled the root cause of this failure.",
+
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ executionVertex.getID()));
+ rootCauseExecutionAttemptId =
+
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+ }
+
+ final Execution execution =
executionVertex.getCurrentExecutionAttempt();
+ if
(failureHandlingResult.getVerticesToRestart().contains(executionVertex.getID())
+ && execution.getFailureInfo().isPresent()) {
+ failedExecutions.put(execution.getAttemptId(), execution);
+ }
+ }
Review comment:
I don't think that this is the most efficient way to look up the
required information. If the EG is quite large and the failure region is small,
then it will be quite costly. Maybe instead of passing in an
`Iterable<ExecutionVertex>` we could pass in a lookup function
`Function<ExecutionVertexID, ExecutionVertex>` which uses the `JobVertexID` and
the `subtaskIndex` to look up the `ExecutionVertex`. This is not super
important but if this is easy to change, it might save us some CPU cycles.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
##########
@@ -36,57 +36,91 @@
private static final long serialVersionUID = -7647332765867297434L;
- private final Collection<ExceptionHistoryEntry> concurrentExceptions = new
ArrayList<>();
+ private final Iterable<ExceptionHistoryEntry> concurrentExceptions;
+
+ /**
+ * Creates a {@code RootExceptionHistoryEntry} based on the passed {@link
+ * FailureHandlingResultSnapshot}.
+ *
+ * @param snapshot The reason for the failure.
+ * @return The {@code RootExceptionHistoryEntry} instance.
+ * @throws NullPointerException if {@code cause} or {@code
failingTaskName} are {@code null}.
+ * @throws IllegalArgumentException if the {@code timestamp} of the passed
{@code
+ * FailureHandlingResult} is not bigger than {@code 0}.
+ */
+ public static RootExceptionHistoryEntry fromFailureHandlingResultSnapshot(
+ FailureHandlingResultSnapshot snapshot) {
+ String failingTaskName = null;
+ TaskManagerLocation taskManagerLocation = null;
+ if (snapshot.getRootCauseExecution().isPresent()) {
+ final Execution rootCauseExecution =
snapshot.getRootCauseExecution().get();
+ failingTaskName = rootCauseExecution.getVertexWithAttempt();
+ taskManagerLocation =
rootCauseExecution.getAssignedResourceLocation();
+ }
+
+ return new RootExceptionHistoryEntry(
+ snapshot.getRootCause(),
+ snapshot.getTimestamp(),
+ failingTaskName,
+ taskManagerLocation,
+
StreamSupport.stream(snapshot.getConcurrentlyFailedExecution().spliterator(),
false)
+ .map(
+ execution ->
+ ExceptionHistoryEntry.create(
+ execution,
execution.getVertexWithAttempt()))
+ .collect(Collectors.toSet()));
Review comment:
Instead of doing this, could we call `fromGlobalFailure` or
`internalCreationMethodSomething` because this part looks quite similar to what
the `fromGlobalFailure` does. That way we avoid a bit of code duplication.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+ private final ExecutionAttemptID rootCauseExecution;
+ private final Throwable rootCause;
+ private final long timestamp;
+ private final Map<ExecutionAttemptID, Execution> failedExecutions;
Review comment:
Same here, why do we need to index the set of failed executions?
Wouldn't `Set<Execution>` be good enough?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
##########
@@ -36,57 +36,91 @@
private static final long serialVersionUID = -7647332765867297434L;
- private final Collection<ExceptionHistoryEntry> concurrentExceptions = new
ArrayList<>();
+ private final Iterable<ExceptionHistoryEntry> concurrentExceptions;
Review comment:
Shall we make this a `Set`? This is not a strong argument but `Iterable`
feels as if this class does not really own the `ExceptionHistoryEntries`,
whereas if it is a `Set` or any other `Collection`, then it feels a bit
different.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+ private final ExecutionAttemptID rootCauseExecution;
Review comment:
Why do we store the `ExecutionAttemptID` instead of the `Execution` here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+ private final ExecutionAttemptID rootCauseExecution;
+ private final Throwable rootCause;
+ private final long timestamp;
+ private final Map<ExecutionAttemptID, Execution> failedExecutions;
+
+ /**
+ * Creates a {@code FailureHandlingResultSnapshot} based on the passed
{@link
+ * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
+ *
+ * @param failureHandlingResult The {@code FailureHandlingResult} that is
used for extracting
+ * the failure information.
+ * @param executionVertices The {@code ExecutionVertices} that shall be
scanned for failures
+ * based on the passed {@code FailureHandlingResult}.
+ * @return The {@code FailureHandlingResultSnapshot}.
+ */
+ public static FailureHandlingResultSnapshot create(
+ FailureHandlingResult failureHandlingResult,
+ Iterable<ExecutionVertex> executionVertices) {
+ final ExecutionVertexID rootCauseExecutionVertexId =
+
failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null);
+
+ ExecutionAttemptID rootCauseExecutionAttemptId = null;
+ final Map<ExecutionAttemptID, Execution> failedExecutions = new
HashMap<>();
+ for (ExecutionVertex executionVertex : executionVertices) {
+ if (executionVertex.getID().equals(rootCauseExecutionVertexId)) {
+ Preconditions.checkArgument(
+
executionVertex.getCurrentExecutionAttempt().getFailureInfo().isPresent(),
+ String.format(
+ "The execution %s didn't provide a failure
info even though the corresponding ExecutionVertex %s is marked as having
handled the root cause of this failure.",
+
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ executionVertex.getID()));
+ rootCauseExecutionAttemptId =
+
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+ }
+
+ final Execution execution =
executionVertex.getCurrentExecutionAttempt();
+ if
(failureHandlingResult.getVerticesToRestart().contains(executionVertex.getID())
+ && execution.getFailureInfo().isPresent()) {
+ failedExecutions.put(execution.getAttemptId(), execution);
+ }
+ }
+
+ return new FailureHandlingResultSnapshot(
+ rootCauseExecutionAttemptId,
+ failureHandlingResult.getError(),
+ failureHandlingResult.getTimestamp(),
+ failedExecutions);
+ }
+
+ @VisibleForTesting
+ FailureHandlingResultSnapshot(
+ @Nullable ExecutionAttemptID rootCauseExecution,
+ Throwable rootCause,
+ long timestamp,
+ Map<ExecutionAttemptID, Execution> failedExecutions) {
+ this.rootCauseExecution = rootCauseExecution;
+ this.rootCause = rootCause;
+ this.timestamp = timestamp;
+ this.failedExecutions = failedExecutions;
+ }
+
+ /**
+ * Returns the {@link Execution} that handled the root cause for this
failure. An empty {@code
+ * Optional} will be returned if it's a global failure.
+ *
+ * @return The {@link Execution} that handled the root cause for this
failure.
+ */
+ public Optional<Execution> getRootCauseExecution() {
+ if (rootCauseExecution == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(failedExecutions.get(rootCauseExecution));
+ }
+
+ /**
+ * The actual failure that is handled.
+ *
+ * @return The {@code Throwable}.
+ */
+ public Throwable getRootCause() {
+ return rootCause;
+ }
+
+ /**
+ * The time the failure occurred.
+ *
+ * @return The time of the failure.
+ */
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * All {@link Execution Executions} that failed and are planned to be
restarted as part of this
+ * failure handling.
+ *
+ * @return The concurrently failed {@code Executions}.
+ */
+ public Iterable<Execution> getConcurrentlyFailedExecution() {
+ return failedExecutions.values().stream()
+ .filter(execution ->
!execution.getAttemptId().equals(rootCauseExecution))
+ .collect(Collectors.toSet());
Review comment:
nit: Not the most efficient way to do it. Alternatively one could store
`rootCauseExecution` and other failed `Executions` in separate fields.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+ private final ExecutionAttemptID rootCauseExecution;
Review comment:
`@Nullable` is missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@code FailureHandlingResultSnapshot} creates a snapshot of a {@link
FailureHandlingResult}
+ * providing the actual {@link Execution Executions}.
+ */
+public class FailureHandlingResultSnapshot {
+
+ private final ExecutionAttemptID rootCauseExecution;
+ private final Throwable rootCause;
+ private final long timestamp;
+ private final Map<ExecutionAttemptID, Execution> failedExecutions;
+
+ /**
+ * Creates a {@code FailureHandlingResultSnapshot} based on the passed
{@link
+ * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
+ *
+ * @param failureHandlingResult The {@code FailureHandlingResult} that is
used for extracting
+ * the failure information.
+ * @param executionVertices The {@code ExecutionVertices} that shall be
scanned for failures
+ * based on the passed {@code FailureHandlingResult}.
+ * @return The {@code FailureHandlingResultSnapshot}.
+ */
+ public static FailureHandlingResultSnapshot create(
+ FailureHandlingResult failureHandlingResult,
+ Iterable<ExecutionVertex> executionVertices) {
+ final ExecutionVertexID rootCauseExecutionVertexId =
+
failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null);
+
+ ExecutionAttemptID rootCauseExecutionAttemptId = null;
+ final Map<ExecutionAttemptID, Execution> failedExecutions = new
HashMap<>();
+ for (ExecutionVertex executionVertex : executionVertices) {
+ if (executionVertex.getID().equals(rootCauseExecutionVertexId)) {
+ Preconditions.checkArgument(
+
executionVertex.getCurrentExecutionAttempt().getFailureInfo().isPresent(),
+ String.format(
+ "The execution %s didn't provide a failure
info even though the corresponding ExecutionVertex %s is marked as having
handled the root cause of this failure.",
+
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ executionVertex.getID()));
+ rootCauseExecutionAttemptId =
+
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+ }
+
+ final Execution execution =
executionVertex.getCurrentExecutionAttempt();
+ if
(failureHandlingResult.getVerticesToRestart().contains(executionVertex.getID())
+ && execution.getFailureInfo().isPresent()) {
+ failedExecutions.put(execution.getAttemptId(), execution);
+ }
+ }
+
+ return new FailureHandlingResultSnapshot(
+ rootCauseExecutionAttemptId,
+ failureHandlingResult.getError(),
Review comment:
`getError` returns a field which is `@Nullable`. This violates the
constructor.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -635,32 +632,29 @@ protected final void archiveGlobalFailure(@Nullable
Throwable failure) {
private void archiveGlobalFailure(@Nullable Throwable failure, long
timestamp) {
exceptionHistory.add(
- exceptionHistoryEntryExtractor.extractGlobalFailure(
- executionGraph.getAllExecutionVertices(), failure,
timestamp));
+ RootExceptionHistoryEntry.fromGlobalFailure(
+ failure, timestamp,
executionGraph.getAllExecutionVertices()));
Review comment:
Here we are still relying on the `ExecutionVertices`. Shall we change it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]