StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r250989196
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -1113,25 +1113,23 @@ private void assignExecutionGraph(
private void resetAndScheduleExecutionGraph() throws Exception {
validateRunsInMainThread();
-
- final CompletableFuture<Void> executionGraphAssignedFuture;
-
+ MainThreadExecutor mainThreadExecutor = getMainThreadExecutor();
if (executionGraph.getState() == JobStatus.CREATED) {
- executionGraphAssignedFuture =
CompletableFuture.completedFuture(null);
+ scheduleExecutionGraph();
} else {
suspendAndClearExecutionGraphFields(new
FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
final JobManagerJobMetricGroup
newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
final ExecutionGraph newExecutionGraph =
createAndRestoreExecutionGraph(newJobManagerJobMetricGroup);
- executionGraphAssignedFuture =
executionGraph.getTerminationFuture().handleAsync(
- (JobStatus ignored, Throwable throwable) -> {
- assignExecutionGraph(newExecutionGraph,
newJobManagerJobMetricGroup);
- return null;
- },
- getMainThreadExecutor());
+ executionGraph.getTerminationFuture()
+ .handleAsync(
+ (JobStatus ignored, Throwable
throwable) -> {
+
assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
+ return null;
+ },
+ mainThreadExecutor)
+ .thenRun(this::scheduleExecutionGraph);
Review comment:
Ok, fine with me as well :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services