noorall commented on code in PR #25552:
URL: https://github.com/apache/flink/pull/25552#discussion_r1901523524


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -126,52 +136,101 @@ public ParallelismAndInputInfos 
decideParallelismAndInputInfosForVertex(
                             ? vertexInitialParallelism
                             : computeSourceParallelismUpperBound(jobVertexId, 
vertexMaxParallelism);
             return new ParallelismAndInputInfos(parallelism, 
Collections.emptyMap());
-        } else {
-            int minParallelism = Math.max(globalMinParallelism, 
vertexMinParallelism);
-            int maxParallelism = globalMaxParallelism;
-
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && vertexMaxParallelism < minParallelism) {
-                LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the 
minimum parallelism {}. "
-                                + "Use {} as the lower bound to decide 
parallelism of job vertex {}.",
-                        vertexMaxParallelism,
-                        minParallelism,
-                        vertexMaxParallelism,
-                        jobVertexId);
-                minParallelism = vertexMaxParallelism;
-            }
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && vertexMaxParallelism < maxParallelism) {
-                LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the 
global maximum parallelism {}. "
-                                + "Use {} as the upper bound to decide 
parallelism of job vertex {}.",
-                        vertexMaxParallelism,
-                        maxParallelism,
-                        vertexMaxParallelism,
-                        jobVertexId);
-                maxParallelism = vertexMaxParallelism;
-            }
-            checkState(maxParallelism >= minParallelism);
-
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && areAllInputsAllToAll(consumedResults)
-                    && !areAllInputsBroadcast(consumedResults)) {
-                return decideParallelismAndEvenlyDistributeData(
-                        jobVertexId,
-                        consumedResults,
-                        vertexInitialParallelism,
-                        minParallelism,
-                        maxParallelism);
-            } else {
-                return decideParallelismAndEvenlyDistributeSubpartitions(
-                        jobVertexId,
-                        consumedResults,
-                        vertexInitialParallelism,
-                        minParallelism,
-                        maxParallelism);
+        }
+
+        int minParallelism = Math.max(globalMinParallelism, 
vertexMinParallelism);
+        int maxParallelism = globalMaxParallelism;
+
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                && vertexMaxParallelism < minParallelism) {
+            LOG.info(
+                    "The vertex maximum parallelism {} is smaller than the 
minimum parallelism {}. "
+                            + "Use {} as the lower bound to decide parallelism 
of job vertex {}.",
+                    vertexMaxParallelism,
+                    minParallelism,
+                    vertexMaxParallelism,
+                    jobVertexId);
+            minParallelism = vertexMaxParallelism;
+        }
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                && vertexMaxParallelism < maxParallelism) {
+            LOG.info(
+                    "The vertex maximum parallelism {} is smaller than the 
global maximum parallelism {}. "
+                            + "Use {} as the upper bound to decide parallelism 
of job vertex {}.",
+                    vertexMaxParallelism,
+                    maxParallelism,
+                    vertexMaxParallelism,
+                    jobVertexId);
+            maxParallelism = vertexMaxParallelism;
+        }
+        checkState(maxParallelism >= minParallelism);
+
+        int parallelism =
+                vertexInitialParallelism > 0
+                        ? vertexInitialParallelism
+                        : decideParallelism(
+                                jobVertexId, consumedResults, minParallelism, 
maxParallelism);
+
+        Map<Boolean, List<BlockingInputInfo>> inputsGroupByInterCorrelation =
+                consumedResults.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        
BlockingInputInfo::existInterInputsKeyCorrelation));
+
+        // For AllToAll like inputs, we derive parallelism as a whole, while 
for Pointwise inputs,
+        // we need to derive parallelism separately for each input.
+        //
+        // In the following cases, we need to reset min parallelism and max 
parallelism to ensure
+        // that the decide parallelism for all inputs is consistent :
+        // 1.  Vertex has a specified parallelism
+        // 2.  There are edges that don't need to follow intergroup constraint

Review Comment:
   > Why?
   
   If the vertex specifies a parallelism, we should follow it;
   If there is a pointwise type (with inter set to false), since its 
parallelism deduction is carried out on a one-by-one basis for individual 
inputs, we need to pre-determine the parallelism to ensure that the parallelism 
for all inputs is consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to