metaswirl commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805326141



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +979,203 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
 
         assertThat(
-                scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
-                        .canRestart(),
-                is(false));
+                        scheduler
+                                .howToHandleFailure(
+                                        new SuppressRestartsException(new 
Exception("test")))
+                                .canRestart())
+                .isFalse();
+    }
+
+    static class RunFailedJobListener implements JobStatusListener {
+        OneShotLatch jobRunning;
+        OneShotLatch jobFailed;
+
+        public RunFailedJobListener() {
+            this.jobRunning = new OneShotLatch();
+            this.jobFailed = new OneShotLatch();
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
+            if (newJobStatus == JobStatus.RUNNING) {
+                jobRunning.trigger();
+            }
+            if (newJobStatus == JobStatus.FAILED) {
+                jobFailed.trigger();
+            }
+        }
+
+        public void waitForRunning() throws InterruptedException {
+            jobRunning.await();
+        }
+
+        public void waitForFailed() throws InterruptedException {
+            jobFailed.await();
+        }
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            int numAvailableSlots,
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic)
+            throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        RunFailedJobListener listener = new RunFailedJobListener();
+        List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobStatusListener(listener)
+                        .build();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+        taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        listener.waitForRunning();
+
+        final Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+        final List<ExecutionAttemptID> attemptIds =
+                IterableUtils.toStream(executionVertices)
+                        
.map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+                        .map(ArchivedExecution::getAttemptId)
+                        .collect(Collectors.toList());
+        Runnable runnable = testLogic.apply(scheduler, attemptIds);
+        CompletableFuture<Void> runTestLogicFuture =
+                CompletableFuture.runAsync(runnable, 
singleThreadMainThreadExecutor);
+        runTestLogicFuture.get();
+
+        Consumer<ExecutionAttemptID> canceller =
+                attemptId ->
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionStateTransition(
+                                        new TaskExecutionState(
+                                                attemptId, 
ExecutionState.CANCELED, null)));
+        CompletableFuture<Void> cancelFuture =
+                CompletableFuture.runAsync(
+                        () -> cancelledTasks.forEach(canceller), 
singleThreadMainThreadExecutor);
+        cancelFuture.get();
+        listener.waitForFailed();
+
+        return scheduler.requestJob().getExceptionHistory();
+    }
+
+    @Test
+    public void testExceptionHistoryWithGlobalFailure() throws Exception {
+        final Exception expectedException = new Exception("Expected Global 
Exception");
+        BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic =
+                (scheduler, attemptIds) -> {
+                    final ExecutionAttemptID attemptId = attemptIds.remove(0);
+
+                    return () -> {
+                        scheduler.handleGlobalFailure(expectedException);
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionStateTransition(
+                                        new TaskExecutionState(
+                                                attemptId, 
ExecutionState.CANCELED, null)));
+                    };
+                };
+
+        final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                runExceptionHistoryTests(1, testLogic);
+
+        assertThat(actualExceptionHistory).hasSize(1);
+
+        final RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+        assertThat(failure.getTaskManagerLocation()).isNull();
+        assertThat(failure.getFailingTaskName()).isNull();
+
+        assertThat(failure.getException().deserializeError(classLoader))
+                .isEqualTo(expectedException);
+    }
+
+    @Test
+    public void testExceptionHistoryWithTaskFailure() throws Exception {
+        final Exception expectedException = new Exception("Expected Local 
Exception");
+        final int numAvailableSlots = 4;
+        BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic =
+                (scheduler, attemptIds) -> {
+                    final ExecutionAttemptID attemptId = attemptIds.get(1);
+
+                    return () ->
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionStateTransition(
+                                            new TaskExecutionState(
+                                                    attemptId,
+                                                    ExecutionState.FAILED,
+                                                    expectedException)));
+                };
+
+        final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                runExceptionHistoryTests(numAvailableSlots, testLogic);
+
+        assertThat(actualExceptionHistory).hasSize(1);
+
+        final RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
+        assertThat(failure.getException().deserializeError(classLoader))
+                .isEqualTo(expectedException);
+    }
+
+    @Test
+    public void testExceptionHistoryWithTaskConcurrentFailure() throws 
Exception {

Review comment:
       > If I understand this correctly, the DefaultScheduler now simply 
ignores any failures to the CANCELLING state
   
   Yes, that's also how I read it.
   
   > I'd suggest we do the same
   
   Sounds good.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -148,6 +162,33 @@ public Logger getLogger() {
         return logger;
     }
 
+    protected Throwable extractError(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+        Throwable cause = 
taskExecutionStateTransition.getError(userCodeClassLoader);
+        if (cause == null) {
+            cause = new FlinkException("Unknown failure cause. Probably 
related to FLINK-21376.");
+        }
+        return cause;
+    }
+
+    protected Optional<ExecutionVertexID> extractExecutionVertexID(
+            TaskExecutionStateTransition taskExecutionStateTransition) {
+        return 
executionGraph.getExecutionVertexId(taskExecutionStateTransition.getID());
+    }
+
+    protected static Optional<RootExceptionHistoryEntry> convertFailures(
+            Function<ExecutionVertexID, Optional<ExecutionVertex>> lookup,

Review comment:
       Using `find` instead of `get` sounds good.
   
   As for the doc string, I personally don't find the doc string clearer than 
the method signature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to