XComp commented on a change in pull request #15640:
URL: https://github.com/apache/flink/pull/15640#discussion_r615683397



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -1126,6 +1133,94 @@ public void testExceptionHistoryWithRestartableFailure() 
{
                                 failingException, 
updateStateTriggeringJobFailureTimestamp)));
     }
 
+    @Ignore
+    @Test
+    public void testExceptionHistoryConcurrentRestart() throws Exception {
+        final JobGraph jobGraph = singleJobVertexJobGraph(2);
+
+        final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+        final TestingLogicalSlotBuilder logicalSlotBuilder = new 
TestingLogicalSlotBuilder();
+        logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation);
+
+        executionSlotAllocatorFactory = new 
TestExecutionSlotAllocatorFactory(logicalSlotBuilder);
+
+        final ReorganizableManuallyTriggeredScheduledExecutor delayExecutor =
+                new ReorganizableManuallyTriggeredScheduledExecutor();
+        final TestFailoverStrategyFactory failoverStrategyFactory =
+                new TestFailoverStrategyFactory();
+        final DefaultScheduler scheduler =
+                createScheduler(
+                        jobGraph,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new PipelinedRegionSchedulingStrategy.Factory(),
+                        failoverStrategyFactory,
+                        delayExecutor);
+        scheduler.startScheduling();
+
+        final ExecutionVertex executionVertex0 =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0);
+        final ExecutionVertex executionVertex1 =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 1);
+
+        // single-ExecutionVertex failure
+        final RuntimeException exception0 = new RuntimeException("failure #0");
+        failoverStrategyFactory.setTasksToRestart(executionVertex0.getID());
+        final long updateStateTriggeringRestartTimestamp0 =
+                initiateFailure(
+                        scheduler,
+                        
executionVertex0.getCurrentExecutionAttempt().getAttemptId(),
+                        exception0);
+
+        // multi-ExecutionVertex failure
+        final RuntimeException exception1 = new RuntimeException("failure #1");
+        failoverStrategyFactory.setTasksToRestart(
+                executionVertex1.getID(), executionVertex0.getID());
+        final long updateStateTriggeringRestartTimestamp1 =
+                initiateFailure(
+                        scheduler,
+                        
executionVertex1.getCurrentExecutionAttempt().getAttemptId(),
+                        exception1);
+
+        // there might be a race condition with the delayExecutor if the tasks 
are scheduled quite
+        // close to each other which we want to simulate here
+        Collections.reverse(delayExecutor.getCollectedScheduledTasks());
+
+        delayExecutor.triggerNonPeriodicScheduledTasks();

Review comment:
       Would you have suggested to pick the `ScheduledTask` right after it was 
issued through `initiateFailure` and set the order of the `ScheduledTasks` 
explicitly?!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to