tillrohrmann commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r566868391



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
##########
@@ -64,9 +64,10 @@
      * Returns the exception that caused the job to fail. This is the first 
root exception that was
      * not recoverable and triggered job failure.
      *
-     * @return failure exception as a string, or {@code "(null)"}
+     * @return failure exception wrapped in a {@link ErrorInfo}, or {@code 
null} if no exception was
+     *     caught.
      */
-    String getFailureCauseAsString();
+    ErrorInfo getFailureInfo();

Review comment:
       Same here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
##########
@@ -64,9 +65,11 @@
      * Returns the exception that caused the job to fail. This is the first 
root exception that was
      * not recoverable and triggered job failure.
      *
-     * @return failure exception as a string, or {@code "(null)"}
+     * @return {@link ErrorInfo} containing the {@code Throwable} wrapped in a 
{@link
+     *     SerializedThrowable} and the time it was registered, or {@code 
null} if no exception was
+     *     caught.
      */
-    String getFailureCauseAsString();
+    ErrorInfo getFailureInfo();

Review comment:
       If this method returns `null`, then I would suggest to return an 
`Optional<ErrorInfo>`. This makes the contract clearer.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
##########
@@ -70,14 +69,14 @@ public ArchivedExecution(
             ExecutionAttemptID attemptId,
             int attemptNumber,
             ExecutionState state,
-            String failureCause,
+            ErrorInfo failureCause,

Review comment:
       `@Nullable` missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
##########
@@ -37,7 +36,7 @@
 
     private final ExecutionState state;
 
-    private final String failureCause; // once assigned, never changes
+    private final ErrorInfo failureInfo; // once assigned, never changes

Review comment:
       I guess that this field is `@Nullable`, right?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
##########
@@ -99,23 +103,31 @@ public FailureHandlingResult 
getGlobalFailureHandlingResult(final Throwable caus
     }
 
     private FailureHandlingResult handleFailure(
+            final ExecutionVertexID failingExecutionVertexId,

Review comment:
       I think `@Nullable` annotation is missing here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -322,8 +322,11 @@ public Throwable getFailureCause() {
     }
 
     @Override
-    public String getFailureCauseAsString() {
-        return ExceptionUtils.stringifyException(getFailureCause());
+    public ErrorInfo getFailureInfo() {
+        return getFailureCause() == null
+                ? null
+                : new ErrorInfo(
+                        new SerializedThrowable(getFailureCause()), 
getStateTimestamp(FAILED));

Review comment:
       I think wrapping `getFailureCause` in a `SerializedThrowable` is not 
necessary because that's what `ErrorInfo` does in the constructor.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
##########
@@ -351,7 +351,19 @@ private static void compareExecutionVertex(
                 runtimeVertex.getStateTimestamp(ExecutionState.FAILED),
                 archivedVertex.getStateTimestamp(ExecutionState.FAILED));
         assertEquals(
-                runtimeVertex.getFailureCauseAsString(), 
archivedVertex.getFailureCauseAsString());
+                runtimeVertex.getFailureInfo() == null
+                        ? null
+                        : 
runtimeVertex.getFailureInfo().getExceptionAsString(),
+                archivedVertex.getFailureInfo() == null
+                        ? null
+                        : 
archivedVertex.getFailureInfo().getExceptionAsString());

Review comment:
       with `Optional<ErrorInfo>` this might look a bit nicer.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -100,17 +116,22 @@ public long getRestartDelayMS() {
         }
     }
 
+    /**
+     * Returns the {@link ExecutionVertexID} of the task causing this failure.
+     *
+     * @return The {@code ExecutionVertexID} or {@code null} if it's a global 
failure.
+     */
+    public ExecutionVertexID getExecutionVertexIdOfFailedTask() {

Review comment:
       `@Nullable` is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -38,6 +39,9 @@
     /** Delay before the restarting can be conducted. */
     private final long restartDelayMS;
 
+    /** The ExecutionVertexID refering to the ExecutionVertex the failure is 
originating from. */
+    private final ExecutionVertexID failingExecutionVertexId;

Review comment:
       I think `@Nullable` is missing here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -47,27 +51,39 @@
     /**
      * Creates a result of a set of tasks to restart to recover from the 
failure.
      *
+     * @param failingExecutionVertexId the {@link ExecutionVertexID} refering 
to the {@link
+     *     ExecutionVertex} the failure is originating from.

Review comment:
       Explanation missing what `null` value means.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -267,9 +267,16 @@ private void restartTasksWithDelay(final 
FailureHandlingResult failureHandlingRe
         delayExecutor.schedule(
                 () ->
                         FutureUtils.assertNoException(
-                                cancelFuture.thenRunAsync(
-                                        restartTasks(executionVertexVersions, 
globalRecovery),
-                                        getMainThreadExecutor())),
+                                cancelFuture
+                                        .thenRunAsync(
+                                                () ->
+                                                        
archiveFromFailureHandlingResult(
+                                                                
failureHandlingResult),
+                                                getMainThreadExecutor())
+                                        .thenRunAsync(
+                                                restartTasks(
+                                                        
executionVertexVersions, globalRecovery),
+                                                getMainThreadExecutor())),

Review comment:
       Why did you introduce a second `thenRunAsync`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        // initiate restartable failure
+        final ExecutionAttemptID restartableAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+        Range<Long> updateStateTriggeringRestartTimeframe =
+                initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // initiate job failure
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final ExecutionAttemptID failingAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException failingException = new 
RuntimeException("failing exception");
+        Range<Long> updateStateTriggeringJobFailureTimeframe =
+                initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+        List<ErrorInfo> actualExceptionHistory = 
scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory.size(), is(2));
+
+        // assert restarted attempt
+        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        assertThat(
+                restartableFailure
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(restartableException));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+        // assert job failure attempt
+        ErrorInfo globalFailure = actualExceptionHistory.get(1);
+        Throwable actualException =
+                
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+        assertThat(actualException.getCause(), is(failingException));

Review comment:
       One could use `FlinkMatchers.containsCause`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -641,6 +648,33 @@ public void cancel() {
         return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        Set<ExecutionAttemptID> executionAttemptIds =
+                
IterableUtils.toStream(executionGraph.getAllExecutionVertices())
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .map(Execution::getAttemptId)
+                        .collect(Collectors.toSet());
+
+        globalTaskFailures.put(
+                executionAttemptIds, new ErrorInfo(failure, 
System.currentTimeMillis()));
+
+        log.debug("Archive global {}: {}", failure, failure.getMessage());

Review comment:
       How would this log line look like? Wouldn't `failure.getMessage` be 
duplicated? Maybe also say "Archive global failure...".

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -641,6 +648,33 @@ public void cancel() {
         return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        Set<ExecutionAttemptID> executionAttemptIds =
+                
IterableUtils.toStream(executionGraph.getAllExecutionVertices())
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .map(Execution::getAttemptId)
+                        .collect(Collectors.toSet());
+
+        globalTaskFailures.put(
+                executionAttemptIds, new ErrorInfo(failure, 
System.currentTimeMillis()));
+
+        log.debug("Archive global {}: {}", failure, failure.getMessage());
+    }
+
+    protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+        if (failureHandlingResult.getExecutionVertexIdOfFailedTask() == null) {

Review comment:
       I think it is nicer if you have something like 
`failureHandlingResult.isGlobalFailure()`. 
`failureHandlingResult.getExecutionVertexIdOfFailedTask() == null` is not very 
expressive.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -100,17 +116,22 @@ public long getRestartDelayMS() {
         }
     }
 
+    /**
+     * Returns the {@link ExecutionVertexID} of the task causing this failure.
+     *
+     * @return The {@code ExecutionVertexID} or {@code null} if it's a global 
failure.
+     */
+    public ExecutionVertexID getExecutionVertexIdOfFailedTask() {

Review comment:
       Alternatively you can introduce a `isLocalFailure()` method and let this 
method fail if `failingExecutionVertexId == null`. That way you enforce the 
contract that you can only ask for the failing execution vertex id if it is a 
local failure.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -47,27 +51,39 @@
     /**
      * Creates a result of a set of tasks to restart to recover from the 
failure.
      *
+     * @param failingExecutionVertexId the {@link ExecutionVertexID} refering 
to the {@link
+     *     ExecutionVertex} the failure is originating from.
+     * @param cause the exception that caused this failure.
      * @param verticesToRestart containing task vertices to restart to recover 
from the failure
      * @param restartDelayMS indicate a delay before conducting the restart
      */
     private FailureHandlingResult(
-            Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, 
boolean globalFailure) {
+            ExecutionVertexID failingExecutionVertexId,

Review comment:
       `@Nullable` is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -164,6 +168,8 @@
     private final Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap;
 
     private final ComponentMainThreadExecutor mainThreadExecutor;
+    private final Map<ExecutionAttemptID, ErrorInfo> localTaskFailures = new 
HashMap<>();
+    private final Map<Set<ExecutionAttemptID>, ErrorInfo> globalTaskFailures = 
new HashMap<>();

Review comment:
       For what do we need the maps here? Wouldn't a single `List<ErrorInfo>` 
be good enough here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        // initiate restartable failure
+        final ExecutionAttemptID restartableAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+        Range<Long> updateStateTriggeringRestartTimeframe =
+                initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // initiate job failure
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final ExecutionAttemptID failingAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException failingException = new 
RuntimeException("failing exception");
+        Range<Long> updateStateTriggeringJobFailureTimeframe =
+                initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+        List<ErrorInfo> actualExceptionHistory = 
scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory.size(), is(2));
+
+        // assert restarted attempt
+        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        assertThat(
+                restartableFailure
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(restartableException));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+        // assert job failure attempt
+        ErrorInfo globalFailure = actualExceptionHistory.get(1);
+        Throwable actualException =
+                
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+        assertThat(actualException.getCause(), is(failingException));
+        assertThat(
+                globalFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+        assertThat(
+                globalFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+    }
+
+    private static Range<Long> initiateFailure(
+            DefaultScheduler scheduler,
+            JobID jobId,
+            ExecutionAttemptID executionAttemptID,
+            Throwable exception) {
+        long start = System.currentTimeMillis();

Review comment:
       `System.currentTimeMillis()` can be susceptible to clock resets. Hence, 
it can cause some test instabilities. Unfortunately, `Execution` also uses this 
to determine the state timestamp.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to