Myasuka commented on code in PR #18523:
URL: https://github.com/apache/flink/pull/18523#discussion_r1093029060


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -117,10 +120,25 @@ public static String generatePlan(
                 // write the core properties
                 JobVertexID vertexID = vertex.getID();
                 int storeParallelism = 
vertexParallelism.getParallelism(vertexID);
+                int parallelism =
+                        storeParallelism != -1 ? storeParallelism : 
vertex.getParallelism();

Review Comment:
   I think the `-1` here can also be replaced by 
`ExecutionConfig#PARALLELISM_DEFAULT`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##########
@@ -197,9 +199,23 @@ public ExecutionGraph createAndRestoreExecutionGraph(
                 // check whether we can restore from a savepoint
                 tryRestoreExecutionGraphFromSavepoint(
                         newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
+
+                for (JobVertex jobVertex : jobGraph.getVertices()) {
+                    int maxParallelism =
+                            newExecutionGraph
+                                    .getAllVertices()
+                                    .get(jobVertex.getID())
+                                    .getMaxParallelism();
+
+                    jobVertex.setMaxParallelism(maxParallelism);
+                }
             }
         }
 
+        // set the basic properties
+
+        
newExecutionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));

Review Comment:
   Can we just avoid the changes to the execution graph builder? This is a bit 
dangerous to touch the kernel part when only implementing a UI change.
   I think you can still apply the changes on the default json plan without 
these changes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -117,10 +120,25 @@ public static String generatePlan(
                 // write the core properties
                 JobVertexID vertexID = vertex.getID();
                 int storeParallelism = 
vertexParallelism.getParallelism(vertexID);
+                int parallelism =
+                        storeParallelism != -1 ? storeParallelism : 
vertex.getParallelism();
                 gen.writeStringField("id", vertexID.toString());
-                gen.writeNumberField(
-                        "parallelism",
-                        storeParallelism != -1 ? storeParallelism : 
vertex.getParallelism());
+                gen.writeNumberField("parallelism", parallelism);
+                int storeMaxParallelism =
+                        vertexParallelism
+                                .getMaxParallelismForVertices()
+                                .getOrDefault(vertexID, 
MAX_PARALLELISM_DEFAULT);
+                int maxParallelism =
+                        storeMaxParallelism == MAX_PARALLELISM_DEFAULT
+                                ? (vertex.getMaxParallelism() == 
MAX_PARALLELISM_DEFAULT
+                                        ? (parallelism == 
MAX_PARALLELISM_DEFAULT

Review Comment:
   The condition should be `parallelism == 
ExecutionConfig#PARALLELISM_DEFAULT`. The same value `-1` can have different 
meanings.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java:
##########
@@ -30,6 +32,7 @@
  * SlotAllocator#tryReserveResources(VertexParallelism)}.
  */
 public interface VertexParallelism {
+    @Nonnull

Review Comment:
   I think it's better to add some javadoc here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to