zentol commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r808412091



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
##########
@@ -56,16 +57,32 @@ public BoundedFIFOQueue(int maxSize) {
      * Adds an element to the end of the queue. An element will be removed 
from the head of the
      * queue if the queue would exceed its maximum size by adding the new 
element.
      *
-     * @param element The element that should be added to the end of the queue.
+     * @param t The element that should be added to the end of the queue.
      * @throws NullPointerException If {@code null} is passed as an element.
      */
-    public void add(T element) {
-        Preconditions.checkNotNull(element);
-        if (elements.add(element) && elements.size() > maxSize) {
+    //    @Override
+    public boolean add(T t) {
+        Preconditions.checkNotNull(t);
+        if (elements.add(t) && elements.size() > maxSize) {
             elements.poll();
         }
+        return true;
     }
 
+    public ArrayList<T> toArrayList() {
+        return new ArrayList<>(elements);
+    }
+
+    //    @Override
+    //    public T poll() {

Review comment:
       please clean this up

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -306,22 +327,87 @@ void deliverOperatorEventToCoordinator(
                 operatorId, request);
     }
 
+    /** Transition to different state when failure occurs. Stays in the same 
state by default. */
+    abstract void onFailure(Throwable cause);
+
+    <T extends StateTransitions.ToRestarting & StateTransitions.ToFailing> 
void restartOrFail(

Review comment:
       I'm a bit uncomfortable with adding a method into the parent that is 
only usable for some of the sub-classes.
   
   I get the goal of sharing the code, but this could also be achieved with 
some static utility method.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
 
         assertThat(
-                scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
-                        .canRestart(),
-                is(false));
+                        scheduler
+                                .howToHandleFailure(
+                                        new SuppressRestartsException(new 
Exception("test")))
+                                .canRestart())
+                .isFalse();
+    }
+
+    static class RunFailedJobListener implements JobStatusListener {
+        OneShotLatch jobRunning;
+        OneShotLatch jobTerminal;
+
+        public RunFailedJobListener() {
+            this.jobRunning = new OneShotLatch();
+            this.jobTerminal = new OneShotLatch();
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
+            if (newJobStatus == JobStatus.RUNNING) {
+                jobRunning.trigger();
+            }
+            if (!jobRunning.isTriggered()) {
+                return;
+            }
+            // CREATED is the job status used the AdaptiveScheduler state 
WAITING FOR RESOURCES
+            if (newJobStatus == JobStatus.FAILED || newJobStatus == 
JobStatus.CREATED) {
+                jobTerminal.trigger();
+            }
+        }
+
+        public void waitForRunning() throws InterruptedException {
+            jobRunning.await();
+        }
+
+        public void waitForTerminal() throws InterruptedException {
+            jobTerminal.await();
+        }
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic)
+            throws Exception {
+        return runExceptionHistoryTests(testLogic, ignored -> {}, ignored -> 
{});
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic,
+            Consumer<AdaptiveSchedulerBuilder> setupScheduler)
+            throws Exception {
+        return runExceptionHistoryTests(testLogic, setupScheduler, ignored -> 
{});
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic,
+            Consumer<AdaptiveSchedulerBuilder> setupScheduler,
+            Consumer<JobGraph> setupJobGraph)
+            throws Exception {
+        final int numAvailableSlots = 4;
+        final JobGraph jobGraph = createJobGraph();
+        setupJobGraph.accept(jobGraph);
+        RunFailedJobListener listener = new RunFailedJobListener();
+        List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
+
+        final CompletedCheckpointStore completedCheckpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final CheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
+        final CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
+        TestingCheckpointRecoveryFactory checkpointRecoveryFactory =
+                new TestingCheckpointRecoveryFactory(completedCheckpointStore, 
checkpointIDCounter);
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        AdaptiveSchedulerBuilder builder =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        
.setCheckpointRecoveryFactory(checkpointRecoveryFactory)
+                        .setCheckpointCleaner(checkpointCleaner)
+                        .setJobStatusListener(listener);
+        setupScheduler.accept(builder);
+        final AdaptiveScheduler scheduler = builder.build();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+        taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        listener.waitForRunning();
+
+        final Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();

Review comment:
       Since you're using a dedicated singleThreadMainThreadExecutor you must 
never call directly into the scheduler.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -1067,20 +1094,20 @@ public void onFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
     }
 
     @Override
-    public Executing.FailureResult howToHandleFailure(Throwable failure) {
-        if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
-            return Executing.FailureResult.canNotRestart(
-                    new JobException("The failure is not recoverable", 
failure));
+    public FailureResult howToHandleFailure(Throwable cause) {
+        if (ExecutionFailureHandler.isUnrecoverableError(cause)) {
+            return FailureResult.canNotRestart(
+                    new JobException("The failure is not recoverable", cause));
         }
 
-        restartBackoffTimeStrategy.notifyFailure(failure);
+        restartBackoffTimeStrategy.notifyFailure(cause);
         if (restartBackoffTimeStrategy.canRestart()) {
-            return Executing.FailureResult.canRestart(

Review comment:
       This change should've been a separate commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -936,12 +940,24 @@ public void testConsistentMaxParallelism() throws 
Exception {
 
     @Test
     public void testHowToHandleFailureRejectedByStrategy() throws Exception {
+        JobGraph jobGraph = createJobGraph();
         final AdaptiveScheduler scheduler =
-                new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor)
+                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
                         
.setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
                         .build();
 
-        assertThat(scheduler.howToHandleFailure(new 
Exception("test")).canRestart(), is(false));
+        assertThat(scheduler.howToHandleFailure(new 
Exception("test")).canRestart()).isFalse();
+    }
+
+    @Test
+    public void testHowToHandleFailureRejectedByStrategyWithNull() throws 
Exception {

Review comment:
       I don't get the name; what is null in this test?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -140,12 +147,14 @@
             ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
                     TEST_EXECUTOR_RESOURCE.getExecutor());
 
+    private final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+
     @Test
     public void testInitialState() throws Exception {
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
 
-        assertThat(scheduler.getState(), instanceOf(Created.class));
+        assertThat(scheduler.getState()).isInstanceOf(Created.class);

Review comment:
       Should've been a separate commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
 
         assertThat(
-                scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
-                        .canRestart(),
-                is(false));
+                        scheduler
+                                .howToHandleFailure(
+                                        new SuppressRestartsException(new 
Exception("test")))
+                                .canRestart())
+                .isFalse();
+    }
+
+    static class RunFailedJobListener implements JobStatusListener {
+        OneShotLatch jobRunning;
+        OneShotLatch jobTerminal;
+
+        public RunFailedJobListener() {
+            this.jobRunning = new OneShotLatch();
+            this.jobTerminal = new OneShotLatch();
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
+            if (newJobStatus == JobStatus.RUNNING) {
+                jobRunning.trigger();
+            }
+            if (!jobRunning.isTriggered()) {
+                return;
+            }
+            // CREATED is the job status used the AdaptiveScheduler state 
WAITING FOR RESOURCES

Review comment:
       You could alternatively just wait until the job is running a second time.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
 
         assertThat(
-                scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
-                        .canRestart(),
-                is(false));
+                        scheduler
+                                .howToHandleFailure(
+                                        new SuppressRestartsException(new 
Exception("test")))
+                                .canRestart())
+                .isFalse();
+    }
+
+    static class RunFailedJobListener implements JobStatusListener {
+        OneShotLatch jobRunning;
+        OneShotLatch jobTerminal;
+
+        public RunFailedJobListener() {
+            this.jobRunning = new OneShotLatch();
+            this.jobTerminal = new OneShotLatch();
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
+            if (newJobStatus == JobStatus.RUNNING) {
+                jobRunning.trigger();
+            }
+            if (!jobRunning.isTriggered()) {
+                return;
+            }
+            // CREATED is the job status used the AdaptiveScheduler state 
WAITING FOR RESOURCES
+            if (newJobStatus == JobStatus.FAILED || newJobStatus == 
JobStatus.CREATED) {
+                jobTerminal.trigger();
+            }
+        }
+
+        public void waitForRunning() throws InterruptedException {
+            jobRunning.await();
+        }
+
+        public void waitForTerminal() throws InterruptedException {
+            jobTerminal.await();
+        }
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic)
+            throws Exception {
+        return runExceptionHistoryTests(testLogic, ignored -> {}, ignored -> 
{});
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic,
+            Consumer<AdaptiveSchedulerBuilder> setupScheduler)
+            throws Exception {
+        return runExceptionHistoryTests(testLogic, setupScheduler, ignored -> 
{});
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic,
+            Consumer<AdaptiveSchedulerBuilder> setupScheduler,
+            Consumer<JobGraph> setupJobGraph)
+            throws Exception {
+        final int numAvailableSlots = 4;
+        final JobGraph jobGraph = createJobGraph();
+        setupJobGraph.accept(jobGraph);
+        RunFailedJobListener listener = new RunFailedJobListener();
+        List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
+
+        final CompletedCheckpointStore completedCheckpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final CheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
+        final CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
+        TestingCheckpointRecoveryFactory checkpointRecoveryFactory =
+                new TestingCheckpointRecoveryFactory(completedCheckpointStore, 
checkpointIDCounter);
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        AdaptiveSchedulerBuilder builder =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        
.setCheckpointRecoveryFactory(checkpointRecoveryFactory)
+                        .setCheckpointCleaner(checkpointCleaner)
+                        .setJobStatusListener(listener);
+        setupScheduler.accept(builder);
+        final AdaptiveScheduler scheduler = builder.build();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+        taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        listener.waitForRunning();
+
+        final Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+        final List<ExecutionAttemptID> attemptIds =
+                IterableUtils.toStream(executionVertices)
+                        
.map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+                        .map(ArchivedExecution::getAttemptId)
+                        .collect(Collectors.toList());
+        Runnable runnable = testLogic.apply(scheduler, attemptIds);
+        CompletableFuture<Void> runTestLogicFuture =
+                CompletableFuture.runAsync(runnable, 
singleThreadMainThreadExecutor);
+        runTestLogicFuture.get();

Review comment:
       It seems overly complicate to make the test return a runnable. testLogic 
could be a BiConsumer instead.
   ```suggestion
           CompletableFuture.runAsync(
                   () -> testLogic.apply(scheduler, attemptIds), 
singleThreadMainThreadExecutor)
               get();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to