XComp commented on a change in pull request #14593:
URL: https://github.com/apache/flink/pull/14593#discussion_r555753109



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
##########
@@ -667,7 +667,6 @@ private DefaultScheduler setupTestJobAndScheduler(
         final ComponentMainThreadExecutor mainThreadExecutor =
                 new ComponentMainThreadExecutorServiceAdapter(
                         (ScheduledExecutorService) executor, 
Thread.currentThread());

Review comment:
       ```suggestion
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -103,12 +105,15 @@
 
     private SchedulerTestingUtils() {}
 
-    public static DefaultSchedulerBuilder newSchedulerBuilder(final JobGraph 
jobGraph) {
-        return new DefaultSchedulerBuilder(jobGraph);
+    public static DefaultSchedulerBuilder newSchedulerBuilder(

Review comment:
       Does it make sense to add 
`ComponentMainThreadExecutorServiceAdapter.forMainThread()` as a default value? 
Quite a few of this method's occurrences are using the 
`ComponentMainThreadExecutorServiceAdapter`. 🤔 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -1284,10 +1270,12 @@ private void disposeAllOperatorCoordinators() {
         return coordinatorMap.values();
     }
 
-    private Map<OperatorID, OperatorCoordinatorHolder> createCoordinatorMap() {
+    private Map<OperatorID, OperatorCoordinatorHolder> createCoordinatorMap(
+            ComponentMainThreadExecutor mainThreadExecutor) {
         Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new 
HashMap<>();
         for (ExecutionJobVertex vertex : 
executionGraph.getAllVertices().values()) {
             for (OperatorCoordinatorHolder holder : 
vertex.getOperatorCoordinators()) {
+                holder.lazyInitialize(this, mainThreadExecutor);

Review comment:
       Just to double-check: 
`executionGraph.getAllVertices().values().stream().map(ExecutionJobVertex::getOperatorCoordinators)`
 contains the same `OperatorCoordinatorHolder` instances as 
`getAllCoordinators` which was used in `initializeOperatorCoordinators(..)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to