GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] 
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327163946
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##########
 @@ -75,10 +137,281 @@ public DefaultScheduler(
                        slotRequestTimeout,
                        shuffleMaster,
                        partitionTracker);
+
+               this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+               this.slotRequestTimeout = slotRequestTimeout;
+               this.slotProvider = slotProvider;
+               this.delayExecutor = delayExecutor;
+               this.userCodeLoader = userCodeLoader;
+               this.schedulingStrategyFactory = 
checkNotNull(schedulingStrategyFactory);
+               this.failoverStrategyFactory = 
checkNotNull(failoverStrategyFactory);
+               this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
+               this.executionVertexVersioner = executionVertexVersioner;
+               this.conditionalFutureHandlerFactory = new 
ConditionalFutureHandlerFactory(executionVertexVersioner);
+       }
+
+       // 
------------------------------------------------------------------------
+       // SchedulerNG
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void startSchedulingInternal() {
+               initializeScheduling();
+               schedulingStrategy.startScheduling();
+       }
+
+       private void initializeScheduling() {
+               executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), 
restartBackoffTimeStrategy);
+               schedulingStrategy = 
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), 
getJobGraph());
+               executionSlotAllocator = new 
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), 
slotRequestTimeout);
+               setTaskFailureListener(new 
UpdateTaskExecutionStateInDefaultSchedulerListener(this, 
getJobGraph().getJobID()));
+               prepareExecutionGraphForScheduling();
+       }
+
+       @Override
+       public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+               final Optional<ExecutionVertexID> executionVertexIdOptional = 
getExecutionVertexId(taskExecutionState.getID());
+               if (executionVertexIdOptional.isPresent()) {
+                       final ExecutionVertexID executionVertexId = 
executionVertexIdOptional.get();
+                       updateState(taskExecutionState);
+                       
schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
+                       maybeHandleTaskFailure(taskExecutionState, 
executionVertexId);
 
 Review comment:
   @zhuzhurk
   
   I gave this some thought. 
   
   > when we invoke maybeHandleTaskFailure right after invoking 
schedulingStrategy.onExecutionStateChange, the task state may even have changed 
in the call stack chain so that we are doing failover handling in an unexpected 
state
   
   If we added `getMainThreadExecutor().execute()` to 
`allocateSlotsAndDeploy()`, I think it wouldn't solve this issue. We still 
might re-deploy tasks that were already re-deployed. 
   
   I think there is a deeper underlying problem to why these lines look 
confusing:
   ```
   schedulingStrategy.onExecutionStateChange(executionVertexId, 
taskExecutionState.getExecutionState());
   maybeHandleTaskFailure(taskExecutionState, executionVertexId);
   ```
   
   The `SchedulingStrategy` gets informed about task failures but is not 
supposed to re-deploy or reason about failed tasks because the failure handling 
is done centrally (e.g. inside `flip1.FailoverStrategy`). 
   
   At the moment, I wouldn't change anything. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to