XComp commented on code in PR #21019:
URL: https://github.com/apache/flink/pull/21019#discussion_r993677013


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -298,6 +303,27 @@ private JobGraph createJobGraph() {
         return jobGraph;
     }
 
+    private void waitForSerializationFuturesAndUpdateJobVertices()
+            throws ExecutionException, InterruptedException {
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            final 
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>
+                    futuresForJobVertex = 
coordinatorSerializationFutures.remove(jobVertex.getID());
+            if (futuresForJobVertex == null) {
+                LOG.warn(
+                        "No OperatorCoordinator creator serialized for 
JobVertex {}.",
+                        jobVertex.getID());

Review Comment:
   :+1: Initially, I assumed that we would have to have a serialized 
`OperatorCoordinator` for each of the vertices. But going through the 
`JobGraph` creation, I realized that this is not the case. But in any case, 
using Precondition instead of logging a warning would have been the better 
option.
   
   Anyway, I updated the code and changed the for loop to iterate over the map 
of futures, instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to