tillrohrmann commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578407995
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -153,7 +152,7 @@
private LogicalSlot assignedResource;
- private Throwable failureCause; // once assigned, never changes
+ private Optional<ErrorInfo> failureCause = Optional.empty(); // once
assigned, never changes
Review comment:
The comment seems to be wrong now.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -1133,7 +1128,10 @@ private void processFail(
checkState(transitionState(current, FAILED, t));
// success (in a manner of speaking)
- this.failureCause = t;
+ if (t != null) {
+ // we only set the failureCause if an error is passed (see
FLINK-21376)
+ this.failureCause = Optional.of(new ErrorInfo(t,
getStateTimestamp(FAILED)));
+ }
Review comment:
Can't we say that if `t == null`, then we create an unknown failure
cause? `new FlinkException("Unknown cause for Execution failure. This might be
caused by FLINK-21376")`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -635,6 +641,33 @@ public void cancel() {
return
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
}
+ protected void archiveGlobalFailure(Throwable failure) {
+ taskFailureHistory.add(
+ new ErrorInfo(failure,
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+ log.debug("Archive global failure.", failure);
+ }
+
+ protected void archiveFromFailureHandlingResult(FailureHandlingResult
failureHandlingResult) {
+ Optional<Execution> executionOptional =
+ failureHandlingResult
+ .getExecutionVertexIdOfFailedTask()
+ .map(this::getExecutionVertex)
+ .map(ExecutionVertex::getCurrentExecutionAttempt);
Review comment:
I think `archiveFromFailureHandlingResult` can also be called when
handling a global failover. In this case `executionOptional` would be empty and
as a consequence we won't record a failure cause.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -635,6 +641,33 @@ public void cancel() {
return
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
}
+ protected void archiveGlobalFailure(Throwable failure) {
+ taskFailureHistory.add(
+ new ErrorInfo(failure,
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+ log.debug("Archive global failure.", failure);
+ }
+
+ protected void archiveFromFailureHandlingResult(FailureHandlingResult
failureHandlingResult) {
+ Optional<Execution> executionOptional =
+ failureHandlingResult
+ .getExecutionVertexIdOfFailedTask()
+ .map(this::getExecutionVertex)
+ .map(ExecutionVertex::getCurrentExecutionAttempt);
+
+ executionOptional.ifPresent(
+ execution ->
+ execution
+ .getFailureInfo()
+ .ifPresent(
+ failureInfo -> {
+
taskFailureHistory.add(failureInfo);
+ log.debug(
+ "Archive local failure
causing attempt {} to fail: {}",
+ execution.getAttemptId(),
+
failureInfo.getExceptionAsString());
+ }));
Review comment:
Why do we have to ask the execution for the failure cause? If we are
only interested in the root cause, doesn't `failureHandlingResult` contain all
the required information?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -38,6 +42,12 @@
/** Delay before the restarting can be conducted. */
private final long restartDelayMS;
+ /**
+ * The ExecutionVertexID refering to the ExecutionVertex the failure is
originating from or
+ * {@code null} if it's a global failure.
+ */
+ @Nullable private final ExecutionVertexID failingExecutionVertexId;
+
/** Reason why the failure is not recoverable. */
Review comment:
The JavaDoc seems wrong. If I am not mistaken, then we will also set
this field if the failure is recoverable, right?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +871,87 @@ public void
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(),
hasSize(0));
}
+ @Test
+ public void testExceptionHistoryWithRestartableFailure() {
Review comment:
I think we also need a test for global failover.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]