XComp commented on a change in pull request #15898:
URL: https://github.com/apache/flink/pull/15898#discussion_r649872391
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -145,14 +151,14 @@
*/
public class AdaptiveScheduler
implements SchedulerNG,
- Created.Context,
- WaitingForResources.Context,
- CreatingExecutionGraph.Context,
- Executing.Context,
- Restarting.Context,
- Failing.Context,
- Finished.Context,
- StopWithSavepoint.Context {
+ Created.Context,
+ WaitingForResources.Context,
+ CreatingExecutionGraph.Context,
+ Executing.Context,
+ Restarting.Context,
+ Failing.Context,
+ Finished.Context,
+ StopWithSavepoint.Context {
Review comment:
Sorry for the late response. I am still busy with other stuff. I hope to
get back to you today or on Monday.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -59,27 +59,49 @@
public static FailureHandlingResultSnapshot create(
FailureHandlingResult failureHandlingResult,
Function<ExecutionVertexID, Execution> latestExecutionLookup) {
+ return create(
+ failureHandlingResult.getExecutionVertexIdOfFailedTask(),
+ failureHandlingResult.getError(),
+ failureHandlingResult.getVerticesToRestart(),
+ failureHandlingResult.getTimestamp(),
+ latestExecutionLookup);
+ }
+
+ /**
+ * Creates a {@code FailureHandlingResultSnapshot} based on the passed
{@link
+ * FailureHandlingResult} and {@link ExecutionVertex ExecutionVertices}.
Review comment:
Looks like a copy&paste error for the JavaDoc. :-)
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +801,140 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
assertThat(
scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
+ .howToHandleFailure(
+ null, new SuppressRestartsException(new
Exception("test")))
.canRestart(),
is(false));
}
+ @Test
+ public void testExceptionHistoryWithGlobalFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 1;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("Expected Global
Exception");
+ final long start = System.currentTimeMillis();
+ final CountDownLatch latch = new CountDownLatch(1);
Review comment:
FYI: Alternatively, you could use the `OneShotLatch` here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
##########
@@ -74,8 +74,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
-import java.util.function.Function;
Review comment:
`Function` is still used in this test class (in
`MockExecutionJobVertex`) and shouldn't be removed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -880,6 +891,19 @@ public void goToFinished(ArchivedExecutionGraph
archivedExecutionGraph) {
transitionToState(new Finished.Factory(this, archivedExecutionGraph,
LOG));
}
+ @Override
+ public void archiveFailure(FailureHandlingResultSnapshot
failureHandlingResultSnapshot) {
+ exceptionHistory.add(
+ RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(
+ failureHandlingResultSnapshot));
+ }
+
+ private Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
+ final Collection<RootExceptionHistoryEntry> copy = new
ArrayList<>(exceptionHistory.size());
+ exceptionHistory.forEach(copy::add);
+ return copy;
Review comment:
We could think of moving this logic into `BoundedFIFOQueue` considering
that `SchedulerBase` uses the exact same code. WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -293,9 +316,19 @@ void goToFailing(
static final class FailureResult {
Review comment:
I'm wondering whether we should introduce a unit test for
`FailureResult` considering that it becomes more "powerful". And, maybe, moving
it into `AdaptiveScheduler` might make sense? WDYT?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -763,7 +771,8 @@ public void testHowToHandleFailureRejectedByStrategy()
throws Exception {
.setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
.build();
- assertThat(scheduler.howToHandleFailure(new
Exception("test")).canRestart(), is(false));
+ assertThat(
+ scheduler.howToHandleFailure(null, new
Exception("test")).canRestart(), is(false));
Review comment:
Theoretically, we would have to test passing a non-null value here as
well for the `failingExecutionVertexId` parameter. Introducing a
`FailureResultTest` as mentioned above would free us from doing that.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -122,6 +132,15 @@ boolean
updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState
return successfulUpdate;
}
+ @Nullable
+ private ExecutionVertexID getExcutionVertexId(ExecutionAttemptID id) {
+ Execution execution =
getExecutionGraph().getRegisteredExecutions().get(id);
+ if (execution == null) {
+ return null;
+ }
+ return execution.getVertex().getID();
+ }
+
Review comment:
I think we could move this method into `StateWithExecutionGraph`. It
feels to be a utility method accessing the `ExecutionGraph`. That would also
remove the code redundancy
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -122,6 +132,15 @@ boolean
updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState
return successfulUpdate;
}
+ @Nullable
+ private ExecutionVertexID getExcutionVertexId(ExecutionAttemptID id) {
Review comment:
```suggestion
private ExecutionVertexID getExecutionVertexId(ExecutionAttemptID id) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -178,9 +192,13 @@ private void completeOperationAndGoToFinished(String
savepoint) {
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
}
- private void handleAnyFailure(Throwable cause) {
+ private void handleAnyFailure(
Review comment:
I'm still puzzled by the `handleAnyFailure` method (I know that this is
not directly related to your change). But it feels wrong to have this code
redundancy between `Executing` and `StopWithSavepoint`. We hesitated in in the
first place to let `StopWithSavepoint` inherit from `Executing` since it would
make the code harder to read.
One other option is to move it out of the state implementations into
`AdaptiveScheduler`. Although, I feel like handling a failure is the
responsibility of the state semantically. Just to trigger a discussion here:
What's your thought on that one?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +801,140 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
assertThat(
scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
+ .howToHandleFailure(
+ null, new SuppressRestartsException(new
Exception("test")))
.canRestart(),
is(false));
}
+ @Test
+ public void testExceptionHistoryWithGlobalFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 1;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("Expected Global
Exception");
+ final long start = System.currentTimeMillis();
+ final CountDownLatch latch = new CountDownLatch(1);
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.handleGlobalFailure(expectedException);
+ latch.countDown();
+ });
+
+ latch.await();
+
+ Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final long end = System.currentTimeMillis();
+
+ assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+
+ RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(
+
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ Matchers.is(expectedException));
+ assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+ assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+ assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+ assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+ }
+
+ @Test
+ public void testExceptionHistoryWithTaskFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 4;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("local failure");
+ Iterable<ArchivedExecutionVertex> executionVertices =
+
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+ ExecutionAttemptID attemptId =
+
executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+ final long start = System.currentTimeMillis();
+ final CountDownLatch latch = new CountDownLatch(1);
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ attemptId, ExecutionState.FAILED,
expectedException)));
+ latch.countDown();
+ });
+
+ latch.await();
+
+ Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final long end = System.currentTimeMillis();
+
+ assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+
+ RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(
+
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ Matchers.is(expectedException));
+ assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+ assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+ assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+ assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
Review comment:
There's a `ExceptionHistoryEntryMatcher` which you could use (and
extend) instead. @
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]