JunRuiLee commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1111580649


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java:
##########
@@ -175,35 +175,28 @@ int getNumParallelProducers() {
         return numParallelProducers;
     }
 
-    /**
-     * Currently, this method is only used to compute the maximum number of 
consumers. For dynamic
-     * graph, it should be called before adaptively deciding the downstream 
consumer parallelism.
-     */
-    int getConsumersParallelism() {
-        List<JobEdge> consumers = intermediateDataSet.getConsumers();
-        checkState(!consumers.isEmpty());
-
-        InternalExecutionGraphAccessor graph = getProducer().getGraph();
-        int consumersParallelism =
-                
graph.getJobVertex(consumers.get(0).getTarget().getID()).getParallelism();
-        if (consumers.size() == 1) {
-            return consumersParallelism;
+    int getMaxConsumerParallelism() {
+        if (getProducer().getGraph().isDynamic()) {

Review Comment:
   It seems that the dynamic graph does not require the parallelism of its 
consumers to be consistent for partition reuse, but the non-dynamic graph is 
different. What caused this difference? In other words, can we reset the 
operator's parallelism at the time of jobGraph compilation and before 
setVertexConfig instead of in the scheduler like FLINK-30684.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -465,13 +465,7 @@ public static VertexParallelismStore 
computeVertexParallelismStoreForDynamicGrap
         // global default max parallelism.
         return computeVertexParallelismStore(
                 vertices,
-                v -> {
-                    if (v.getParallelism() > 0) {
-                        return getDefaultMaxParallelism(v);
-                    } else {
-                        return defaultMaxParallelism;
-                    }
-                },
+                v -> Math.max(defaultMaxParallelism, v.getParallelism()),

Review Comment:
   Why do we need this change? Is this change related to bug fixes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to