dmvk commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805205602
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +979,203 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
new AdaptiveSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
assertThat(
- scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
- .canRestart(),
- is(false));
+ scheduler
+ .howToHandleFailure(
+ new SuppressRestartsException(new
Exception("test")))
+ .canRestart())
+ .isFalse();
+ }
+
+ static class RunFailedJobListener implements JobStatusListener {
+ OneShotLatch jobRunning;
+ OneShotLatch jobFailed;
+
+ public RunFailedJobListener() {
+ this.jobRunning = new OneShotLatch();
+ this.jobFailed = new OneShotLatch();
+ }
+
+ @Override
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long
timestamp) {
+ if (newJobStatus == JobStatus.RUNNING) {
+ jobRunning.trigger();
+ }
+ if (newJobStatus == JobStatus.FAILED) {
+ jobFailed.trigger();
+ }
+ }
+
+ public void waitForRunning() throws InterruptedException {
+ jobRunning.await();
+ }
+
+ public void waitForFailed() throws InterruptedException {
+ jobFailed.await();
+ }
+ }
+
+ private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+ int numAvailableSlots,
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic)
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ RunFailedJobListener listener = new RunFailedJobListener();
+ List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .setJobStatusListener(listener)
+ .build();
+
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+ taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ listener.waitForRunning();
+
+ final Iterable<ArchivedExecutionVertex> executionVertices =
+
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+ final List<ExecutionAttemptID> attemptIds =
+ IterableUtils.toStream(executionVertices)
+
.map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+ .map(ArchivedExecution::getAttemptId)
+ .collect(Collectors.toList());
+ Runnable runnable = testLogic.apply(scheduler, attemptIds);
+ CompletableFuture<Void> runTestLogicFuture =
+ CompletableFuture.runAsync(runnable,
singleThreadMainThreadExecutor);
+ runTestLogicFuture.get();
+
+ Consumer<ExecutionAttemptID> canceller =
+ attemptId ->
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ attemptId,
ExecutionState.CANCELED, null)));
+ CompletableFuture<Void> cancelFuture =
+ CompletableFuture.runAsync(
+ () -> cancelledTasks.forEach(canceller),
singleThreadMainThreadExecutor);
+ cancelFuture.get();
+ listener.waitForFailed();
+
+ return scheduler.requestJob().getExceptionHistory();
+ }
+
+ @Test
+ public void testExceptionHistoryWithGlobalFailure() throws Exception {
+ final Exception expectedException = new Exception("Expected Global
Exception");
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic =
+ (scheduler, attemptIds) -> {
+ final ExecutionAttemptID attemptId = attemptIds.remove(0);
+
+ return () -> {
+ scheduler.handleGlobalFailure(expectedException);
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ attemptId,
ExecutionState.CANCELED, null)));
+ };
+ };
+
+ final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ runExceptionHistoryTests(1, testLogic);
+
+ assertThat(actualExceptionHistory).hasSize(1);
+
+ final RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+ assertThat(failure.getTaskManagerLocation()).isNull();
+ assertThat(failure.getFailingTaskName()).isNull();
+
+ assertThat(failure.getException().deserializeError(classLoader))
+ .isEqualTo(expectedException);
+ }
+
+ @Test
+ public void testExceptionHistoryWithTaskFailure() throws Exception {
+ final Exception expectedException = new Exception("Expected Local
Exception");
+ final int numAvailableSlots = 4;
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic =
+ (scheduler, attemptIds) -> {
+ final ExecutionAttemptID attemptId = attemptIds.get(1);
+
+ return () ->
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ attemptId,
+ ExecutionState.FAILED,
+ expectedException)));
+ };
+
+ final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ runExceptionHistoryTests(numAvailableSlots, testLogic);
+
+ assertThat(actualExceptionHistory).hasSize(1);
+
+ final RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(failure.getException().deserializeError(classLoader))
+ .isEqualTo(expectedException);
+ }
+
+ @Test
+ public void testExceptionHistoryWithTaskConcurrentFailure() throws
Exception {
Review comment:
This makes me think that this is actually a bug in `Execution`. On a
failure in `CANCELING` state, I'd expect the `Execution` to transition into the
`FAILING` state instead of `CANCELED`.
This is something we might want to dig deeper into, but we should do that in
another story, so we don't block this effort.
If I understand this correctly, the DefaultScheduler now simply ignores any
failures to the `CANCELLING` state (from the exception history perspective).
If the above assumption is correct, I'd suggest we do the same (simply not
storing these updates into `failureCollection`) and follow up on this by fixing
the Execution state machine (which should IMO fix it for both schedulers).
Hope I didn't miss something obvious, the `Execution` code is bit confusing.
WDYT?
cc @zentol
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -148,6 +162,33 @@ public Logger getLogger() {
return logger;
}
+ protected Throwable extractError(TaskExecutionStateTransition
taskExecutionStateTransition) {
+ Throwable cause =
taskExecutionStateTransition.getError(userCodeClassLoader);
+ if (cause == null) {
+ cause = new FlinkException("Unknown failure cause. Probably
related to FLINK-21376.");
+ }
+ return cause;
+ }
+
+ protected Optional<ExecutionVertexID> extractExecutionVertexID(
+ TaskExecutionStateTransition taskExecutionStateTransition) {
+ return
executionGraph.getExecutionVertexId(taskExecutionStateTransition.getID());
+ }
+
+ protected static Optional<RootExceptionHistoryEntry> convertFailures(
+ Function<ExecutionVertexID, Optional<ExecutionVertex>> lookup,
Review comment:
the method no longer exists
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -148,6 +162,33 @@ public Logger getLogger() {
return logger;
}
+ protected Throwable extractError(TaskExecutionStateTransition
taskExecutionStateTransition) {
+ Throwable cause =
taskExecutionStateTransition.getError(userCodeClassLoader);
+ if (cause == null) {
+ cause = new FlinkException("Unknown failure cause. Probably
related to FLINK-21376.");
+ }
+ return cause;
+ }
+
+ protected Optional<ExecutionVertexID> extractExecutionVertexID(
+ TaskExecutionStateTransition taskExecutionStateTransition) {
+ return
executionGraph.getExecutionVertexId(taskExecutionStateTransition.getID());
+ }
+
+ protected static Optional<RootExceptionHistoryEntry> convertFailures(
+ Function<ExecutionVertexID, Optional<ExecutionVertex>> lookup,
Review comment:
There is a similar method in the `Failure` class. I guess giving the
parameter a more descriptive name would do the trick
```
/**
* Convert the failure into an {@link ExceptionHistoryEntry}.
*
* @param findExecutionFn Function that looks up additional information
on the {@link
* ExecutionVertexID}, which can be used for constructing the {@link
ExceptionHistoryEntry}.
* @return The {@link ExceptionHistoryEntry} derived for this failure.
*/
public abstract ExceptionHistoryEntry toExceptionHistoryEntry(
Function<ExecutionVertexID, Execution> findExecutionFn);
```
One additional thought about naming getters, it's good to distinguish
between getters that return an actual value (`getXXX`) and getters that return
an optional (`findXXX`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]