metaswirl commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r808982055



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
##########
@@ -56,16 +57,32 @@ public BoundedFIFOQueue(int maxSize) {
      * Adds an element to the end of the queue. An element will be removed 
from the head of the
      * queue if the queue would exceed its maximum size by adding the new 
element.
      *
-     * @param element The element that should be added to the end of the queue.
+     * @param t The element that should be added to the end of the queue.
      * @throws NullPointerException If {@code null} is passed as an element.
      */
-    public void add(T element) {
-        Preconditions.checkNotNull(element);
-        if (elements.add(element) && elements.size() > maxSize) {
+    //    @Override
+    public boolean add(T t) {
+        Preconditions.checkNotNull(t);
+        if (elements.add(t) && elements.size() > maxSize) {
             elements.poll();
         }
+        return true;
     }
 
+    public ArrayList<T> toArrayList() {
+        return new ArrayList<>(elements);
+    }
+
+    //    @Override
+    //    public T poll() {

Review comment:
       fixed

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
 
         assertThat(
-                scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
-                        .canRestart(),
-                is(false));
+                        scheduler
+                                .howToHandleFailure(
+                                        new SuppressRestartsException(new 
Exception("test")))
+                                .canRestart())
+                .isFalse();
+    }
+
+    static class RunFailedJobListener implements JobStatusListener {
+        OneShotLatch jobRunning;
+        OneShotLatch jobTerminal;
+
+        public RunFailedJobListener() {
+            this.jobRunning = new OneShotLatch();
+            this.jobTerminal = new OneShotLatch();
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
+            if (newJobStatus == JobStatus.RUNNING) {
+                jobRunning.trigger();
+            }
+            if (!jobRunning.isTriggered()) {
+                return;
+            }
+            // CREATED is the job status used the AdaptiveScheduler state 
WAITING FOR RESOURCES
+            if (newJobStatus == JobStatus.FAILED || newJobStatus == 
JobStatus.CREATED) {
+                jobTerminal.trigger();
+            }
+        }
+
+        public void waitForRunning() throws InterruptedException {
+            jobRunning.await();
+        }
+
+        public void waitForTerminal() throws InterruptedException {
+            jobTerminal.await();
+        }
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic)
+            throws Exception {
+        return runExceptionHistoryTests(testLogic, ignored -> {}, ignored -> 
{});
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic,
+            Consumer<AdaptiveSchedulerBuilder> setupScheduler)
+            throws Exception {
+        return runExceptionHistoryTests(testLogic, setupScheduler, ignored -> 
{});
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic,
+            Consumer<AdaptiveSchedulerBuilder> setupScheduler,
+            Consumer<JobGraph> setupJobGraph)
+            throws Exception {
+        final int numAvailableSlots = 4;
+        final JobGraph jobGraph = createJobGraph();
+        setupJobGraph.accept(jobGraph);
+        RunFailedJobListener listener = new RunFailedJobListener();
+        List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
+
+        final CompletedCheckpointStore completedCheckpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final CheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
+        final CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
+        TestingCheckpointRecoveryFactory checkpointRecoveryFactory =
+                new TestingCheckpointRecoveryFactory(completedCheckpointStore, 
checkpointIDCounter);
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        AdaptiveSchedulerBuilder builder =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        
.setCheckpointRecoveryFactory(checkpointRecoveryFactory)
+                        .setCheckpointCleaner(checkpointCleaner)
+                        .setJobStatusListener(listener);
+        setupScheduler.accept(builder);
+        final AdaptiveScheduler scheduler = builder.build();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+        taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        listener.waitForRunning();
+
+        final Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();
+        final List<ExecutionAttemptID> attemptIds =
+                IterableUtils.toStream(executionVertices)
+                        
.map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+                        .map(ArchivedExecution::getAttemptId)
+                        .collect(Collectors.toList());
+        Runnable runnable = testLogic.apply(scheduler, attemptIds);
+        CompletableFuture<Void> runTestLogicFuture =
+                CompletableFuture.runAsync(runnable, 
singleThreadMainThreadExecutor);
+        runTestLogicFuture.get();

Review comment:
       fixed

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
 
         assertThat(
-                scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
-                        .canRestart(),
-                is(false));
+                        scheduler
+                                .howToHandleFailure(
+                                        new SuppressRestartsException(new 
Exception("test")))
+                                .canRestart())
+                .isFalse();
+    }
+
+    static class RunFailedJobListener implements JobStatusListener {
+        OneShotLatch jobRunning;
+        OneShotLatch jobTerminal;
+
+        public RunFailedJobListener() {
+            this.jobRunning = new OneShotLatch();
+            this.jobTerminal = new OneShotLatch();
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
+            if (newJobStatus == JobStatus.RUNNING) {
+                jobRunning.trigger();
+            }
+            if (!jobRunning.isTriggered()) {
+                return;
+            }
+            // CREATED is the job status used the AdaptiveScheduler state 
WAITING FOR RESOURCES
+            if (newJobStatus == JobStatus.FAILED || newJobStatus == 
JobStatus.CREATED) {
+                jobTerminal.trigger();
+            }
+        }
+
+        public void waitForRunning() throws InterruptedException {
+            jobRunning.await();
+        }
+
+        public void waitForTerminal() throws InterruptedException {
+            jobTerminal.await();
+        }
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic)
+            throws Exception {
+        return runExceptionHistoryTests(testLogic, ignored -> {}, ignored -> 
{});
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic,
+            Consumer<AdaptiveSchedulerBuilder> setupScheduler)
+            throws Exception {
+        return runExceptionHistoryTests(testLogic, setupScheduler, ignored -> 
{});
+    }
+
+    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
+            BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> 
testLogic,
+            Consumer<AdaptiveSchedulerBuilder> setupScheduler,
+            Consumer<JobGraph> setupJobGraph)
+            throws Exception {
+        final int numAvailableSlots = 4;
+        final JobGraph jobGraph = createJobGraph();
+        setupJobGraph.accept(jobGraph);
+        RunFailedJobListener listener = new RunFailedJobListener();
+        List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
+
+        final CompletedCheckpointStore completedCheckpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final CheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
+        final CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
+        TestingCheckpointRecoveryFactory checkpointRecoveryFactory =
+                new TestingCheckpointRecoveryFactory(completedCheckpointStore, 
checkpointIDCounter);
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        AdaptiveSchedulerBuilder builder =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        
.setCheckpointRecoveryFactory(checkpointRecoveryFactory)
+                        .setCheckpointCleaner(checkpointCleaner)
+                        .setJobStatusListener(listener);
+        setupScheduler.accept(builder);
+        final AdaptiveScheduler scheduler = builder.build();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
+        taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
+        listener.waitForRunning();
+
+        final Iterable<ArchivedExecutionVertex> executionVertices =
+                
scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices();

Review comment:
       fixed

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -969,10 +983,329 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
 
         assertThat(
-                scheduler
-                        .howToHandleFailure(new SuppressRestartsException(new 
Exception("test")))
-                        .canRestart(),
-                is(false));
+                        scheduler
+                                .howToHandleFailure(
+                                        new SuppressRestartsException(new 
Exception("test")))
+                                .canRestart())
+                .isFalse();
+    }
+
+    static class RunFailedJobListener implements JobStatusListener {
+        OneShotLatch jobRunning;
+        OneShotLatch jobTerminal;
+
+        public RunFailedJobListener() {
+            this.jobRunning = new OneShotLatch();
+            this.jobTerminal = new OneShotLatch();
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
+            if (newJobStatus == JobStatus.RUNNING) {
+                jobRunning.trigger();
+            }
+            if (!jobRunning.isTriggered()) {
+                return;
+            }
+            // CREATED is the job status used the AdaptiveScheduler state 
WAITING FOR RESOURCES

Review comment:
       Not sure if it arrives in running again, as we don't offer new 
resources/slots.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -1067,20 +1094,20 @@ public void onFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
     }
 
     @Override
-    public Executing.FailureResult howToHandleFailure(Throwable failure) {
-        if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
-            return Executing.FailureResult.canNotRestart(
-                    new JobException("The failure is not recoverable", 
failure));
+    public FailureResult howToHandleFailure(Throwable cause) {
+        if (ExecutionFailureHandler.isUnrecoverableError(cause)) {
+            return FailureResult.canNotRestart(
+                    new JobException("The failure is not recoverable", cause));
         }
 
-        restartBackoffTimeStrategy.notifyFailure(failure);
+        restartBackoffTimeStrategy.notifyFailure(cause);
         if (restartBackoffTimeStrategy.canRestart()) {
-            return Executing.FailureResult.canRestart(

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to