zentol commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r808412091
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
##########
@@ -56,16 +57,32 @@ public BoundedFIFOQueue(int maxSize) {
* Adds an element to the end of the queue. An element will be removed
from the head of the
* queue if the queue would exceed its maximum size by adding the new
element.
*
- * @param element The element that should be added to the end of the queue.
+ * @param t The element that should be added to the end of the queue.
* @throws NullPointerException If {@code null} is passed as an element.
*/
- public void add(T element) {
- Preconditions.checkNotNull(element);
- if (elements.add(element) && elements.size() > maxSize) {
+ // @Override
+ public boolean add(T t) {
+ Preconditions.checkNotNull(t);
+ if (elements.add(t) && elements.size() > maxSize) {
elements.poll();
}
+ return true;
}
+ public ArrayList<T> toArrayList() {
+ return new ArrayList<>(elements);
+ }
+
+ // @Override
+ // public T poll() {
Review comment:
please clean this up
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -306,22 +327,87 @@ void deliverOperatorEventToCoordinator(
operatorId, request);
}
+ /** Transition to different state when failure occurs. Stays in the same
state by default. */
+ abstract void onFailure(Throwable cause);
+
+ <T extends StateTransitions.ToRestarting & StateTransitions.ToFailing>
void restartOrFail(
Review comment:
I'm a bit uncomfortable with adding a method into the parent that is
only usable for some of the sub-classes.
I get the goal of sharing the code, but this could also be achieved with
some static utility method.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
new AdaptiveSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
assertThat(
- scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
- .canRestart(),
- is(false));
+ scheduler
+ .howToHandleFailure(
+ new SuppressRestartsException(new
Exception("test")))
+ .canRestart())
+ .isFalse();
+ }
+
+ static class RunFailedJobListener implements JobStatusListener {
+ OneShotLatch jobRunning;
+ OneShotLatch jobTerminal;
+
+ public RunFailedJobListener() {
+ this.jobRunning = new OneShotLatch();
+ this.jobTerminal = new OneShotLatch();
+ }
+
+ @Override
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long
timestamp) {
+ if (newJobStatus == JobStatus.RUNNING) {
+ jobRunning.trigger();
+ }
+ if (!jobRunning.isTriggered()) {
+ return;
+ }
+ // CREATED is the job status used the AdaptiveScheduler state
WAITING FOR RESOURCES
+ if (newJobStatus == JobStatus.FAILED || newJobStatus ==
JobStatus.CREATED) {
+ jobTerminal.trigger();
+ }
+ }
+
+ public void waitForRunning() throws InterruptedException {
+ jobRunning.await();
+ }
+
+ public void waitForTerminal() throws InterruptedException {
+ jobTerminal.await();
+ }
+ }
+
+ private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic)
+ throws Exception {
+ return runExceptionHistoryTests(testLogic, ignored -> {}, ignored ->
{});
+ }
+
+ private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic,
+ Consumer<AdaptiveSchedulerBuilder> setupScheduler)
+ throws Exception {
+ return runExceptionHistoryTests(testLogic, setupScheduler, ignored ->
{});
+ }
+
+ private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic,
+ Consumer<AdaptiveSchedulerBuilder> setupScheduler,
+ Consumer<JobGraph> setupJobGraph)
+ throws Exception {
+ final int numAvailableSlots = 4;
+ final JobGraph jobGraph = createJobGraph();
+ setupJobGraph.accept(jobGraph);
+ RunFailedJobListener listener = new RunFailedJobListener();
+ List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
+
+ final CompletedCheckpointStore completedCheckpointStore =
+ new StandaloneCompletedCheckpointStore(1);
+ final CheckpointIDCounter checkpointIDCounter = new
StandaloneCheckpointIDCounter();
+ final CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
+ TestingCheckpointRecoveryFactory checkpointRecoveryFactory =
+ new TestingCheckpointRecoveryFactory(completedCheckpointStore,
checkpointIDCounter);
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ AdaptiveSchedulerBuilder builder =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+
.setCheckpointRecoveryFactory(checkpointRecoveryFactory)
+ .setCheckpointCleaner(checkpointCleaner)
+ .setJobStatusListener(listener);
+ setupScheduler.accept(builder);
+ final AdaptiveScheduler scheduler = builder.build();
+
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+ taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ listener.waitForRunning();
+
+ final Iterable<ArchivedExecutionVertex> executionVertices =
+
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
Review comment:
Since you're using a dedicated singleThreadMainThreadExecutor you must
never call directly into the scheduler.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -1067,20 +1094,20 @@ public void onFinished(ArchivedExecutionGraph
archivedExecutionGraph) {
}
@Override
- public Executing.FailureResult howToHandleFailure(Throwable failure) {
- if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
- return Executing.FailureResult.canNotRestart(
- new JobException("The failure is not recoverable",
failure));
+ public FailureResult howToHandleFailure(Throwable cause) {
+ if (ExecutionFailureHandler.isUnrecoverableError(cause)) {
+ return FailureResult.canNotRestart(
+ new JobException("The failure is not recoverable", cause));
}
- restartBackoffTimeStrategy.notifyFailure(failure);
+ restartBackoffTimeStrategy.notifyFailure(cause);
if (restartBackoffTimeStrategy.canRestart()) {
- return Executing.FailureResult.canRestart(
Review comment:
This change should've been a separate commit.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -936,12 +940,24 @@ public void testConsistentMaxParallelism() throws
Exception {
@Test
public void testHowToHandleFailureRejectedByStrategy() throws Exception {
+ JobGraph jobGraph = createJobGraph();
final AdaptiveScheduler scheduler =
- new AdaptiveSchedulerBuilder(createJobGraph(),
mainThreadExecutor)
+ new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
.setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
.build();
- assertThat(scheduler.howToHandleFailure(new
Exception("test")).canRestart(), is(false));
+ assertThat(scheduler.howToHandleFailure(new
Exception("test")).canRestart()).isFalse();
+ }
+
+ @Test
+ public void testHowToHandleFailureRejectedByStrategyWithNull() throws
Exception {
Review comment:
I don't get the name; what is null in this test?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -140,12 +147,14 @@
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
TEST_EXECUTOR_RESOURCE.getExecutor());
+ private final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+
@Test
public void testInitialState() throws Exception {
final AdaptiveScheduler scheduler =
new AdaptiveSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
- assertThat(scheduler.getState(), instanceOf(Created.class));
+ assertThat(scheduler.getState()).isInstanceOf(Created.class);
Review comment:
Should've been a separate commit.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
new AdaptiveSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
assertThat(
- scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
- .canRestart(),
- is(false));
+ scheduler
+ .howToHandleFailure(
+ new SuppressRestartsException(new
Exception("test")))
+ .canRestart())
+ .isFalse();
+ }
+
+ static class RunFailedJobListener implements JobStatusListener {
+ OneShotLatch jobRunning;
+ OneShotLatch jobTerminal;
+
+ public RunFailedJobListener() {
+ this.jobRunning = new OneShotLatch();
+ this.jobTerminal = new OneShotLatch();
+ }
+
+ @Override
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long
timestamp) {
+ if (newJobStatus == JobStatus.RUNNING) {
+ jobRunning.trigger();
+ }
+ if (!jobRunning.isTriggered()) {
+ return;
+ }
+ // CREATED is the job status used the AdaptiveScheduler state
WAITING FOR RESOURCES
Review comment:
You could alternatively just wait until the job is running a second time.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
new AdaptiveSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
assertThat(
- scheduler
- .howToHandleFailure(new SuppressRestartsException(new
Exception("test")))
- .canRestart(),
- is(false));
+ scheduler
+ .howToHandleFailure(
+ new SuppressRestartsException(new
Exception("test")))
+ .canRestart())
+ .isFalse();
+ }
+
+ static class RunFailedJobListener implements JobStatusListener {
+ OneShotLatch jobRunning;
+ OneShotLatch jobTerminal;
+
+ public RunFailedJobListener() {
+ this.jobRunning = new OneShotLatch();
+ this.jobTerminal = new OneShotLatch();
+ }
+
+ @Override
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long
timestamp) {
+ if (newJobStatus == JobStatus.RUNNING) {
+ jobRunning.trigger();
+ }
+ if (!jobRunning.isTriggered()) {
+ return;
+ }
+ // CREATED is the job status used the AdaptiveScheduler state
WAITING FOR RESOURCES
+ if (newJobStatus == JobStatus.FAILED || newJobStatus ==
JobStatus.CREATED) {
+ jobTerminal.trigger();
+ }
+ }
+
+ public void waitForRunning() throws InterruptedException {
+ jobRunning.await();
+ }
+
+ public void waitForTerminal() throws InterruptedException {
+ jobTerminal.await();
+ }
+ }
+
+ private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic)
+ throws Exception {
+ return runExceptionHistoryTests(testLogic, ignored -> {}, ignored ->
{});
+ }
+
+ private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic,
+ Consumer<AdaptiveSchedulerBuilder> setupScheduler)
+ throws Exception {
+ return runExceptionHistoryTests(testLogic, setupScheduler, ignored ->
{});
+ }
+
+ private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+ BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable>
testLogic,
+ Consumer<AdaptiveSchedulerBuilder> setupScheduler,
+ Consumer<JobGraph> setupJobGraph)
+ throws Exception {
+ final int numAvailableSlots = 4;
+ final JobGraph jobGraph = createJobGraph();
+ setupJobGraph.accept(jobGraph);
+ RunFailedJobListener listener = new RunFailedJobListener();
+ List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
+
+ final CompletedCheckpointStore completedCheckpointStore =
+ new StandaloneCompletedCheckpointStore(1);
+ final CheckpointIDCounter checkpointIDCounter = new
StandaloneCheckpointIDCounter();
+ final CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
+ TestingCheckpointRecoveryFactory checkpointRecoveryFactory =
+ new TestingCheckpointRecoveryFactory(completedCheckpointStore,
checkpointIDCounter);
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ AdaptiveSchedulerBuilder builder =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+
.setCheckpointRecoveryFactory(checkpointRecoveryFactory)
+ .setCheckpointCleaner(checkpointCleaner)
+ .setJobStatusListener(listener);
+ setupScheduler.accept(builder);
+ final AdaptiveScheduler scheduler = builder.build();
+
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+ taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
numAvailableSlots)),
+ taskManagerGateway);
+ });
+ listener.waitForRunning();
+
+ final Iterable<ArchivedExecutionVertex> executionVertices =
+
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+ final List<ExecutionAttemptID> attemptIds =
+ IterableUtils.toStream(executionVertices)
+
.map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+ .map(ArchivedExecution::getAttemptId)
+ .collect(Collectors.toList());
+ Runnable runnable = testLogic.apply(scheduler, attemptIds);
+ CompletableFuture<Void> runTestLogicFuture =
+ CompletableFuture.runAsync(runnable,
singleThreadMainThreadExecutor);
+ runTestLogicFuture.get();
Review comment:
It seems overly complicate to make the test return a runnable. testLogic
could be a BiConsumer instead.
```suggestion
CompletableFuture.runAsync(
() -> testLogic.apply(scheduler, attemptIds),
singleThreadMainThreadExecutor)
get();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]