GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime]
Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r327163946
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -75,10 +137,281 @@ public DefaultScheduler(
slotRequestTimeout,
shuffleMaster,
partitionTracker);
+
+ this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+ this.slotRequestTimeout = slotRequestTimeout;
+ this.slotProvider = slotProvider;
+ this.delayExecutor = delayExecutor;
+ this.userCodeLoader = userCodeLoader;
+ this.schedulingStrategyFactory =
checkNotNull(schedulingStrategyFactory);
+ this.failoverStrategyFactory =
checkNotNull(failoverStrategyFactory);
+ this.executionVertexOperations =
checkNotNull(executionVertexOperations);
+ this.executionVertexVersioner = executionVertexVersioner;
+ this.conditionalFutureHandlerFactory = new
ConditionalFutureHandlerFactory(executionVertexVersioner);
+ }
+
+ //
------------------------------------------------------------------------
+ // SchedulerNG
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void startSchedulingInternal() {
+ initializeScheduling();
+ schedulingStrategy.startScheduling();
+ }
+
+ private void initializeScheduling() {
+ executionFailureHandler = new
ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()),
restartBackoffTimeStrategy);
+ schedulingStrategy =
schedulingStrategyFactory.createInstance(this, getSchedulingTopology(),
getJobGraph());
+ executionSlotAllocator = new
DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(),
slotRequestTimeout);
+ setTaskFailureListener(new
UpdateTaskExecutionStateInDefaultSchedulerListener(this,
getJobGraph().getJobID()));
+ prepareExecutionGraphForScheduling();
+ }
+
+ @Override
+ public boolean updateTaskExecutionState(final TaskExecutionState
taskExecutionState) {
+ final Optional<ExecutionVertexID> executionVertexIdOptional =
getExecutionVertexId(taskExecutionState.getID());
+ if (executionVertexIdOptional.isPresent()) {
+ final ExecutionVertexID executionVertexId =
executionVertexIdOptional.get();
+ updateState(taskExecutionState);
+
schedulingStrategy.onExecutionStateChange(executionVertexId,
taskExecutionState.getExecutionState());
+ maybeHandleTaskFailure(taskExecutionState,
executionVertexId);
Review comment:
@zhuzhurk
I gave this some thought.
> when we invoke maybeHandleTaskFailure right after invoking
schedulingStrategy.onExecutionStateChange, the task state may even have changed
in the call stack chain so that we are doing failover handling in an unexpected
state
If we added `getMainThreadExecutor().execute()` to
`allocateSlotsAndDeploy()`, I think it wouldn't solve this issue. We still
might re-deploy tasks that were already re-deployed.
I think there is a deeper underlying problem to why these lines look
confusing:
```
schedulingStrategy.onExecutionStateChange(executionVertexId,
taskExecutionState.getExecutionState());
maybeHandleTaskFailure(taskExecutionState, executionVertexId);
```
The `SchedulingStrategy` gets informed about task failures but is not
supposed to re-deploy or reason about failed tasks because the failure handling
is done centrally (e.g. inside `flip1.FailoverStrategy`).
At the moment, I wouldn't change anything.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services