XComp commented on a change in pull request #15898:
URL: https://github.com/apache/flink/pull/15898#discussion_r659125892
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -110,6 +121,22 @@ ExecutionGraph getExecutionGraph() {
return executionGraph;
}
+ ExecutionVertex getExecutionVertex(final ExecutionVertexID
executionVertexId) {
+ return executionGraph
+ .getAllVertices()
+ .get(executionVertexId.getJobVertexId())
+ .getTaskVertices()[executionVertexId.getSubtaskIndex()];
+ }
+
+ @Nullable
+ protected ExecutionVertexID getExecutionVertexId(ExecutionAttemptID id) {
+ Execution execution =
getExecutionGraph().getRegisteredExecutions().get(id);
+ if (execution == null) {
Review comment:
Looks like I missed that last time: This seems to be wrong, doesn't it?
This method returning `null` would lead to the failure being interpreted as a
global one. It feels to be the wrong location for this decision. I'd propose
that the method expects the ID to be present. Setting the `null` value should
be done in the `handleGlobalFailure` method explicitly. Alternatively, you
could follow what `DefaultScheduler`/`SchedulerBase` are doing with returning
an `Optional` and doing the [state
check](https://github.com/apache/flink/blob/d105eb32cda5f903eebc7a1019de44fdc1ec8472/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L668)
in case of an successful update.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +809,140 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
assertThat(
scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
+ .howToHandleFailure(
+ null, new SuppressRestartsException(new
Exception("test")))
.canRestart(),
is(false));
}
+ @Test
+ public void testExceptionHistoryWithGlobalFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 1;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("Expected Global
Exception");
+ final long start = System.currentTimeMillis();
+ final OneShotLatch latch = new OneShotLatch();
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.handleGlobalFailure(expectedException);
+ latch.trigger();
+ });
+
+ latch.await();
+
+ Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final long end = System.currentTimeMillis();
+
+ assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+
+ RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(
+
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ Matchers.is(expectedException));
+ assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+ assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+ assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+ assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+ }
+
+ @Test
+ public void testExceptionHistoryWithTaskFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 4;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("local failure");
+ Iterable<ArchivedExecutionVertex> executionVertices =
+
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+ ExecutionAttemptID attemptId =
+
executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+ final long start = System.currentTimeMillis();
+ final OneShotLatch latch = new OneShotLatch();
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ attemptId, ExecutionState.FAILED,
expectedException)));
+ latch.trigger();
+ });
+
+ latch.await();
+
+ Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final long end = System.currentTimeMillis();
+
+ assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+
+ RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(
+
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ Matchers.is(expectedException));
+ assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+ assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+ assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
Review comment:
```suggestion
assertThat(failure.getTaskManagerLocation(), is(nullValue()));
```
nit: This code imports already `org.hamcrest.core.Is.is;` statically. The
`Matchers.` is not necessary here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
##########
@@ -59,27 +59,48 @@
public static FailureHandlingResultSnapshot create(
FailureHandlingResult failureHandlingResult,
Function<ExecutionVertexID, Execution> latestExecutionLookup) {
+ return create(
+ failureHandlingResult.getExecutionVertexIdOfFailedTask(),
+ failureHandlingResult.getError(),
+ failureHandlingResult.getVerticesToRestart(),
+ failureHandlingResult.getTimestamp(),
+ latestExecutionLookup);
+ }
+
+ /**
+ * Creates a {@code FailureHandlingResultSnapshot} based on passed
parameters.
+ *
+ * @param failingExecutionVertexId an {@link Optional} of the {@link
ExecutionVertexID} the
+ * failure originates from, or {@code None}.
+ * @param rootCause the failure reason.
+ * @param concurrentVertexIds {@link ExecutionVertexID} Task vertices
concurrently failing with
+ * the {@code failingExecutionVertexID}.
+ * @param timestamp the failure timestamp.
+ * @param latestExecutionLookup The look-up function for retrieving the
latest {@link Execution}
+ * instance for a given {@link ExecutionVertexID}.
+ * @return The {@code FailureHandlingResultSnapshot}.
+ */
+ public static FailureHandlingResultSnapshot create(
+ Optional<ExecutionVertexID> failingExecutionVertexId,
Review comment:
Passing an `Optional` here causes an unnecessary wrapping in
[StateWithExecutionGraph:325](https://github.com/apache/flink/blob/d105eb32cda5f903eebc7a1019de44fdc1ec8472/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java#L325)
just to have it unwrapped in the method. Instead, we could do a
`failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null)` in the
factory method above and make this parameter `@Nullable`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -81,11 +83,15 @@ public void cancel() {
@Override
public void handleGlobalFailure(Throwable cause) {
- handleAnyFailure(cause);
+ handleAnyFailure(null, cause);
}
- private void handleAnyFailure(Throwable cause) {
- final FailureResult failureResult = context.howToHandleFailure(cause);
+ private void handleAnyFailure(
+ @Nullable ExecutionVertexID failingExecutionVertexId, Throwable
cause) {
+ final FailureResult failureResult =
+ context.howToHandleFailure(failingExecutionVertexId, cause);
+
+ archiveExecutionFailure(failingExecutionVertexId, cause);
Review comment:
Thinking about it once more: I guess, it's not the right location to
archive the failure considering that we also want to identify concurrent
failures. We haven't addressed that in this PR, yet.
To achieve that, we have to collect the failure snapshot here and pass it
over to the next state (failure or restart). Any failure that pops up in these
subsequent states has to be collected as well. The archiving should happen when
re-instantiating the `ExecutionGraph`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -81,11 +83,15 @@ public void cancel() {
@Override
public void handleGlobalFailure(Throwable cause) {
- handleAnyFailure(cause);
+ handleAnyFailure(null, cause);
}
- private void handleAnyFailure(Throwable cause) {
- final FailureResult failureResult = context.howToHandleFailure(cause);
+ private void handleAnyFailure(
+ @Nullable ExecutionVertexID failingExecutionVertexId, Throwable
cause) {
+ final FailureResult failureResult =
+ context.howToHandleFailure(failingExecutionVertexId, cause);
+
+ archiveExecutionFailure(failingExecutionVertexId, cause);
Review comment:
We should also cover this in the corresponding
`StateWithExecutionGraphTest` test implementations.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -178,9 +192,13 @@ private void completeOperationAndGoToFinished(String
savepoint) {
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
}
- private void handleAnyFailure(Throwable cause) {
+ private void handleAnyFailure(
Review comment:
Thanks for sharing your thoughts. You're right: Tackling the failure
handling for the `AdaptiveScheduler` might be a bit out of scope of this
ticket. There are some next steps that include making the failure handling of
the `AdaptiveScheduler` more sophisticated. That might be the better place to
fix that. Let's focus on the exception history for now.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +801,140 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
assertThat(
scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
+ .howToHandleFailure(
+ null, new SuppressRestartsException(new
Exception("test")))
.canRestart(),
is(false));
}
+ @Test
+ public void testExceptionHistoryWithGlobalFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 1;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("Expected Global
Exception");
+ final long start = System.currentTimeMillis();
+ final CountDownLatch latch = new CountDownLatch(1);
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.handleGlobalFailure(expectedException);
+ latch.countDown();
+ });
+
+ latch.await();
+
+ Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final long end = System.currentTimeMillis();
+
+ assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+
+ RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(
+
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ Matchers.is(expectedException));
+ assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+ assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+ assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+ assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+ }
+
+ @Test
+ public void testExceptionHistoryWithTaskFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 4;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("local failure");
+ Iterable<ArchivedExecutionVertex> executionVertices =
+
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+ ExecutionAttemptID attemptId =
+
executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+ final long start = System.currentTimeMillis();
+ final CountDownLatch latch = new CountDownLatch(1);
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ attemptId, ExecutionState.FAILED,
expectedException)));
+ latch.countDown();
+ });
+
+ latch.await();
+
+ Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final long end = System.currentTimeMillis();
+
+ assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+
+ RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(
+
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ Matchers.is(expectedException));
+ assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+ assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+ assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+ assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
Review comment:
I think that's a good idea. That makes the test more precise as well.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -763,7 +771,8 @@ public void testHowToHandleFailureRejectedByStrategy()
throws Exception {
.setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
.build();
- assertThat(scheduler.howToHandleFailure(new
Exception("test")).canRestart(), is(false));
+ assertThat(
+ scheduler.howToHandleFailure(null, new
Exception("test")).canRestart(), is(false));
Review comment:
Thanks for addressing this. Could you move this case into its own test
method?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -108,7 +114,10 @@
import static
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
import static
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
Review comment:
nit: I know it's not caused by you, but could you replace the
`org.junit.Assert.assertThat` import by `org.hamcrest.MatcherAssert.assertThat`
in a hotfix commit just to have the deprecation warning removed?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -293,9 +305,19 @@ void goToFailing(
static final class FailureResult {
@Nullable private final Duration backoffTime;
+ /**
+ * the {@link ExecutionVertexID} refering to the {@link
ExecutionVertex} the failure is
Review comment:
```suggestion
* The {@link ExecutionVertexID} refering to the {@link
ExecutionVertex} the failure is
```
nit
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -346,5 +393,12 @@ abstract boolean updateTaskExecutionState(
* Finished} state
*/
void goToFinished(ArchivedExecutionGraph archivedExecutionGraph);
+
+ /**
+ * Archive the details of an execution failure for future retrieval
and inspection.
+ *
+ * @param failureHandlingResultSnapshot
Review comment:
```suggestion
* @param failureHandlingResultSnapshot The {@link
FailureHandlingResultSnapshot} holding the failure information that needs to be
archived.
```
nit: just to please the IDE and remove a warning.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -792,11 +809,140 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
assertThat(
scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
+ .howToHandleFailure(
+ null, new SuppressRestartsException(new
Exception("test")))
.canRestart(),
is(false));
}
+ @Test
+ public void testExceptionHistoryWithGlobalFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 1;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("Expected Global
Exception");
+ final long start = System.currentTimeMillis();
+ final OneShotLatch latch = new OneShotLatch();
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.handleGlobalFailure(expectedException);
+ latch.trigger();
+ });
+
+ latch.await();
+
+ Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final long end = System.currentTimeMillis();
+
+ assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+
+ RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(
+
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ Matchers.is(expectedException));
+ assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+ assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+ assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+ assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+ }
+
+ @Test
+ public void testExceptionHistoryWithTaskFailure() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final int numAvailableSlots = 4;
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ taskManagerGateway.waitForSubmissions(numAvailableSlots,
Duration.ofSeconds(5));
+
+ final Exception expectedException = new Exception("local failure");
+ Iterable<ArchivedExecutionVertex> executionVertices =
+
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+
+ ExecutionAttemptID attemptId =
+
executionVertices.iterator().next().getCurrentExecutionAttempt().getAttemptId();
+ final long start = System.currentTimeMillis();
+ final OneShotLatch latch = new OneShotLatch();
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ attemptId, ExecutionState.FAILED,
expectedException)));
+ latch.trigger();
+ });
+
+ latch.await();
+
+ Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final long end = System.currentTimeMillis();
+
+ assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+
+ RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
+
+ assertThat(
+
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ Matchers.is(expectedException));
+ assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+ assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+ assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue()));
+ assertThat(failure.getFailingTaskName(), Matchers.is(nullValue()));
+ }
+
Review comment:
Can you add the concurrent failure test here as well? This should fail
right now since we're not covering the failure archiving in the restart state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]