XComp commented on a change in pull request #15898:
URL: https://github.com/apache/flink/pull/15898#discussion_r659125892



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -110,6 +121,22 @@ ExecutionGraph getExecutionGraph() {
         return executionGraph;
     }
 
+    ExecutionVertex getExecutionVertex(final ExecutionVertexID 
executionVertexId) {
+        return executionGraph
+                .getAllVertices()
+                .get(executionVertexId.getJobVertexId())
+                .getTaskVertices()[executionVertexId.getSubtaskIndex()];
+    }
+
+    @Nullable
+    protected ExecutionVertexID getExecutionVertexId(ExecutionAttemptID id) {
+        Execution execution = 
getExecutionGraph().getRegisteredExecutions().get(id);
+        if (execution == null) {

Review comment:
       Looks like I missed that last time: This seems to be wrong, doesn't it? 
This method returning `null` would lead to the failure being interpreted as a 
global one. It feels to be the wrong location for this decision. I'd propose 
that the method expects the ID to be present. Setting the `null` value should 
be done in the `handleGlobalFailure` method explicitly. Alternatively, you 
could follow what `DefaultScheduler`/`SchedulerBase` are doing with returning 
an `Optional` and doing the [state 
check](https://github.com/apache/flink/blob/d105eb32cda5f903eebc7a1019de44fdc1ec8472/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L668)
 in case of an successful update.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +809,140 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
 
         assertThat(
                 scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
+                        .howToHandleFailure(
+                                null, new SuppressRestartsException(new 
Exception("test")))
                         .canRestart(),
                 is(false));
     }
 
+    @Test
+    public void testExceptionHistoryWithGlobalFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 1;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("Expected Global 
Exception");
+        final long start = System.currentTimeMillis();
+        final OneShotLatch latch = new OneShotLatch();
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.handleGlobalFailure(expectedException);
+                    latch.trigger();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(
+                
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+    }
+
+    @Test
+    public void testExceptionHistoryWithTaskFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 4;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("local failure");
+        Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+        ExecutionAttemptID attemptId =
+                
executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+        final long start = System.currentTimeMillis();
+        final OneShotLatch latch = new OneShotLatch();
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.updateTaskExecutionState(
+                            new TaskExecutionStateTransition(
+                                    new TaskExecutionState(
+                                            attemptId, ExecutionState.FAILED, 
expectedException)));
+                    latch.trigger();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(
+                
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));

Review comment:
       ```suggestion
           assertThat(failure.getTaskManagerLocation(), is(nullValue()));
   ```
   nit: This code imports already `org.hamcrest.core.Is.is;` statically. The 
`Matchers.` is not necessary here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -59,27 +59,48 @@
     public static FailureHandlingResultSnapshot create(
             FailureHandlingResult failureHandlingResult,
             Function<ExecutionVertexID, Execution> latestExecutionLookup) {
+        return create(
+                failureHandlingResult.getExecutionVertexIdOfFailedTask(),
+                failureHandlingResult.getError(),
+                failureHandlingResult.getVerticesToRestart(),
+                failureHandlingResult.getTimestamp(),
+                latestExecutionLookup);
+    }
+
+    /**
+     * Creates a {@code FailureHandlingResultSnapshot} based on passed 
parameters.
+     *
+     * @param failingExecutionVertexId an {@link Optional} of the {@link 
ExecutionVertexID} the
+     *     failure originates from, or {@code None}.
+     * @param rootCause the failure reason.
+     * @param concurrentVertexIds {@link ExecutionVertexID} Task vertices 
concurrently failing with
+     *     the {@code failingExecutionVertexID}.
+     * @param timestamp the failure timestamp.
+     * @param latestExecutionLookup The look-up function for retrieving the 
latest {@link Execution}
+     *     instance for a given {@link ExecutionVertexID}.
+     * @return The {@code FailureHandlingResultSnapshot}.
+     */
+    public static FailureHandlingResultSnapshot create(
+            Optional<ExecutionVertexID> failingExecutionVertexId,

Review comment:
       Passing an `Optional` here causes an unnecessary wrapping in 
[StateWithExecutionGraph:325](https://github.com/apache/flink/blob/d105eb32cda5f903eebc7a1019de44fdc1ec8472/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java#L325)
 just to have it unwrapped in the method. Instead, we could do a 
`failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null)` in the 
factory method above and make this parameter `@Nullable`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -81,11 +83,15 @@ public void cancel() {
 
     @Override
     public void handleGlobalFailure(Throwable cause) {
-        handleAnyFailure(cause);
+        handleAnyFailure(null, cause);
     }
 
-    private void handleAnyFailure(Throwable cause) {
-        final FailureResult failureResult = context.howToHandleFailure(cause);
+    private void handleAnyFailure(
+            @Nullable ExecutionVertexID failingExecutionVertexId, Throwable 
cause) {
+        final FailureResult failureResult =
+                context.howToHandleFailure(failingExecutionVertexId, cause);
+
+        archiveExecutionFailure(failingExecutionVertexId, cause);

Review comment:
       Thinking about it once more: I guess, it's not the right location to 
archive the failure considering that we also want to identify concurrent 
failures. We haven't addressed that in this PR, yet.
   
   To achieve that, we have to collect the failure snapshot here and pass it 
over to the next state (failure or restart). Any failure that pops up in these 
subsequent states has to be collected as well. The archiving should happen when 
re-instantiating the `ExecutionGraph`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -81,11 +83,15 @@ public void cancel() {
 
     @Override
     public void handleGlobalFailure(Throwable cause) {
-        handleAnyFailure(cause);
+        handleAnyFailure(null, cause);
     }
 
-    private void handleAnyFailure(Throwable cause) {
-        final FailureResult failureResult = context.howToHandleFailure(cause);
+    private void handleAnyFailure(
+            @Nullable ExecutionVertexID failingExecutionVertexId, Throwable 
cause) {
+        final FailureResult failureResult =
+                context.howToHandleFailure(failingExecutionVertexId, cause);
+
+        archiveExecutionFailure(failingExecutionVertexId, cause);

Review comment:
       We should also cover this in the corresponding 
`StateWithExecutionGraphTest` test implementations.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -178,9 +192,13 @@ private void completeOperationAndGoToFinished(String 
savepoint) {
         
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
     }
 
-    private void handleAnyFailure(Throwable cause) {
+    private void handleAnyFailure(

Review comment:
       Thanks for sharing your thoughts. You're right: Tackling the failure 
handling for the `AdaptiveScheduler` might be a bit out of scope of this 
ticket. There are some next steps that include making the failure handling of 
the `AdaptiveScheduler` more sophisticated. That might be the better place to 
fix that. Let's focus on the exception history for now.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +801,140 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
 
         assertThat(
                 scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
+                        .howToHandleFailure(
+                                null, new SuppressRestartsException(new 
Exception("test")))
                         .canRestart(),
                 is(false));
     }
 
+    @Test
+    public void testExceptionHistoryWithGlobalFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 1;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("Expected Global 
Exception");
+        final long start = System.currentTimeMillis();
+        final CountDownLatch latch = new CountDownLatch(1);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.handleGlobalFailure(expectedException);
+                    latch.countDown();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(
+                
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+    }
+
+    @Test
+    public void testExceptionHistoryWithTaskFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 4;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("local failure");
+        Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+        ExecutionAttemptID attemptId =
+                
executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+        final long start = System.currentTimeMillis();
+        final CountDownLatch latch = new CountDownLatch(1);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.updateTaskExecutionState(
+                            new TaskExecutionStateTransition(
+                                    new TaskExecutionState(
+                                            attemptId, ExecutionState.FAILED, 
expectedException)));
+                    latch.countDown();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(
+                
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));

Review comment:
       I think that's a good idea. That makes the test more precise as well.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -763,7 +771,8 @@ public void testHowToHandleFailureRejectedByStrategy() 
throws Exception {
                         
.setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
                         .build();
 
-        assertThat(scheduler.howToHandleFailure(new 
Exception("test")).canRestart(), is(false));
+        assertThat(
+                scheduler.howToHandleFailure(null, new 
Exception("test")).canRestart(), is(false));

Review comment:
       Thanks for addressing this. Could you move this case into its own test 
method?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -108,7 +114,10 @@
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;

Review comment:
       nit: I know it's not caused by you, but could you replace the 
`org.junit.Assert.assertThat` import by `org.hamcrest.MatcherAssert.assertThat` 
in a hotfix commit just to have the deprecation warning removed?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -293,9 +305,19 @@ void goToFailing(
     static final class FailureResult {
         @Nullable private final Duration backoffTime;
 
+        /**
+         * the {@link ExecutionVertexID} refering to the {@link 
ExecutionVertex} the failure is

Review comment:
       ```suggestion
            * The {@link ExecutionVertexID} refering to the {@link 
ExecutionVertex} the failure is
   ```
   nit

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -346,5 +393,12 @@ abstract boolean updateTaskExecutionState(
          *     Finished} state
          */
         void goToFinished(ArchivedExecutionGraph archivedExecutionGraph);
+
+        /**
+         * Archive the details of an execution failure for future retrieval 
and inspection.
+         *
+         * @param failureHandlingResultSnapshot

Review comment:
       ```suggestion
            * @param failureHandlingResultSnapshot The {@link 
FailureHandlingResultSnapshot} holding the failure information that needs to be 
archived.
   ```
   nit: just to please the IDE and remove a warning.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +809,140 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
 
         assertThat(
                 scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
+                        .howToHandleFailure(
+                                null, new SuppressRestartsException(new 
Exception("test")))
                         .canRestart(),
                 is(false));
     }
 
+    @Test
+    public void testExceptionHistoryWithGlobalFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 1;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("Expected Global 
Exception");
+        final long start = System.currentTimeMillis();
+        final OneShotLatch latch = new OneShotLatch();
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.handleGlobalFailure(expectedException);
+                    latch.trigger();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(
+                
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+    }
+
+    @Test
+    public void testExceptionHistoryWithTaskFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 4;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("local failure");
+        Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+        ExecutionAttemptID attemptId =
+                
executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+        final long start = System.currentTimeMillis();
+        final OneShotLatch latch = new OneShotLatch();
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.updateTaskExecutionState(
+                            new TaskExecutionStateTransition(
+                                    new TaskExecutionState(
+                                            attemptId, ExecutionState.FAILED, 
expectedException)));
+                    latch.trigger();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(
+                
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+    }
+

Review comment:
       Can you add the concurrent failure test here as well? This should fail 
right now since we're not covering the failure archiving in the restart state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to