XComp commented on a change in pull request #15898:
URL: https://github.com/apache/flink/pull/15898#discussion_r649872391



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -145,14 +151,14 @@
  */
 public class AdaptiveScheduler
         implements SchedulerNG,
-                Created.Context,
-                WaitingForResources.Context,
-                CreatingExecutionGraph.Context,
-                Executing.Context,
-                Restarting.Context,
-                Failing.Context,
-                Finished.Context,
-                StopWithSavepoint.Context {
+        Created.Context,
+        WaitingForResources.Context,
+        CreatingExecutionGraph.Context,
+        Executing.Context,
+        Restarting.Context,
+        Failing.Context,
+        Finished.Context,
+        StopWithSavepoint.Context {

Review comment:
       Sorry for the late response. I am still busy with other stuff. I hope to 
get back to you today or on Monday.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -59,27 +59,49 @@
     public static FailureHandlingResultSnapshot create(
             FailureHandlingResult failureHandlingResult,
             Function<ExecutionVertexID, Execution> latestExecutionLookup) {
+        return create(
+                failureHandlingResult.getExecutionVertexIdOfFailedTask(),
+                failureHandlingResult.getError(),
+                failureHandlingResult.getVerticesToRestart(),
+                failureHandlingResult.getTimestamp(),
+                latestExecutionLookup);
+    }
+
+    /**
+     * Creates a {@code FailureHandlingResultSnapshot} based on the passed 
{@link
+     * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.

Review comment:
       Looks like a copy&paste error for the JavaDoc. :-)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +801,140 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
 
         assertThat(
                 scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
+                        .howToHandleFailure(
+                                null, new SuppressRestartsException(new 
Exception("test")))
                         .canRestart(),
                 is(false));
     }
 
+    @Test
+    public void testExceptionHistoryWithGlobalFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 1;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("Expected Global 
Exception");
+        final long start = System.currentTimeMillis();
+        final CountDownLatch latch = new CountDownLatch(1);

Review comment:
       FYI: Alternatively, you could use the `OneShotLatch` here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
##########
@@ -74,8 +74,8 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;

Review comment:
       `Function` is still used in this test class (in 
`MockExecutionJobVertex`) and shouldn't be removed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -880,6 +891,19 @@ public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
         transitionToState(new Finished.Factory(this, archivedExecutionGraph, 
LOG));
     }
 
+    @Override
+    public void archiveFailure(FailureHandlingResultSnapshot 
failureHandlingResultSnapshot) {
+        exceptionHistory.add(
+                RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(
+                        failureHandlingResultSnapshot));
+    }
+
+    private Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
+        final Collection<RootExceptionHistoryEntry> copy = new 
ArrayList<>(exceptionHistory.size());
+        exceptionHistory.forEach(copy::add);
+        return copy;

Review comment:
       We could think of moving this logic into `BoundedFIFOQueue` considering 
that `SchedulerBase` uses the exact same code. WDYT?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -293,9 +316,19 @@ void goToFailing(
     static final class FailureResult {

Review comment:
       I'm wondering whether we should introduce a unit test for 
`FailureResult` considering that it becomes more "powerful". And, maybe, moving 
it into `AdaptiveScheduler` might make sense? WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -763,7 +771,8 @@ public void testHowToHandleFailureRejectedByStrategy() 
throws Exception {
                         
.setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
                         .build();
 
-        assertThat(scheduler.howToHandleFailure(new 
Exception("test")).canRestart(), is(false));
+        assertThat(
+                scheduler.howToHandleFailure(null, new 
Exception("test")).canRestart(), is(false));

Review comment:
       Theoretically, we would have to test passing a non-null value here as 
well for the `failingExecutionVertexId` parameter. Introducing a 
`FailureResultTest` as mentioned above would free us from doing that.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -122,6 +132,15 @@ boolean 
updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState
         return successfulUpdate;
     }
 
+    @Nullable
+    private ExecutionVertexID getExcutionVertexId(ExecutionAttemptID id) {
+        Execution execution = 
getExecutionGraph().getRegisteredExecutions().get(id);
+        if (execution == null) {
+            return null;
+        }
+        return execution.getVertex().getID();
+    }
+

Review comment:
       I think we could move this method into `StateWithExecutionGraph`. It 
feels to be a utility method accessing the `ExecutionGraph`. That would also 
remove the code redundancy

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -122,6 +132,15 @@ boolean 
updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState
         return successfulUpdate;
     }
 
+    @Nullable
+    private ExecutionVertexID getExcutionVertexId(ExecutionAttemptID id) {

Review comment:
       ```suggestion
       private ExecutionVertexID getExecutionVertexId(ExecutionAttemptID id) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -178,9 +192,13 @@ private void completeOperationAndGoToFinished(String 
savepoint) {
         
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
     }
 
-    private void handleAnyFailure(Throwable cause) {
+    private void handleAnyFailure(

Review comment:
       I'm still puzzled by the `handleAnyFailure` method (I know that this is 
not directly related to your change). But it feels wrong to have this code 
redundancy between `Executing` and `StopWithSavepoint`. We hesitated in in the 
first place to let `StopWithSavepoint` inherit from `Executing` since it would 
make the code harder to read.
   One other option is to move it out of the state implementations into 
`AdaptiveScheduler`. Although, I feel like handling a failure is the 
responsibility of the state semantically. Just to trigger a discussion here: 
What's your thought on that one?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +801,140 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
 
         assertThat(
                 scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
+                        .howToHandleFailure(
+                                null, new SuppressRestartsException(new 
Exception("test")))
                         .canRestart(),
                 is(false));
     }
 
+    @Test
+    public void testExceptionHistoryWithGlobalFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 1;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("Expected Global 
Exception");
+        final long start = System.currentTimeMillis();
+        final CountDownLatch latch = new CountDownLatch(1);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.handleGlobalFailure(expectedException);
+                    latch.countDown();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(
+                
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+    }
+
+    @Test
+    public void testExceptionHistoryWithTaskFailure() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final int numAvailableSlots = 4;
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+
+        final Exception expectedException = new Exception("local failure");
+        Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+        ExecutionAttemptID attemptId =
+                
executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+        final long start = System.currentTimeMillis();
+        final CountDownLatch latch = new CountDownLatch(1);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.updateTaskExecutionState(
+                            new TaskExecutionStateTransition(
+                                    new TaskExecutionState(
+                                            attemptId, ExecutionState.FAILED, 
expectedException)));
+                    latch.countDown();
+                });
+
+        latch.await();
+
+        Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final long end = System.currentTimeMillis();
+
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+
+        RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(
+                
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                Matchers.is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+        assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));

Review comment:
       There's a `ExceptionHistoryEntryMatcher` which you could use (and 
extend) instead. @




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to