zhuzhurk commented on code in PR #21570:
URL: https://github.com/apache/flink/pull/21570#discussion_r1063214490


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -233,6 +235,8 @@ private ParallelismAndInputInfos 
decideParallelismAndEvenlyDistributeData(
                             subpartitionRanges.size());
             if (!adjustedSubpartitionRanges.isPresent()) {
                 // can't find any legal parallelism, fall back to evenly 
distribute subpartitions
+                LOG.debug(

Review Comment:
   This should be an info(or warn?) log.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -233,6 +235,8 @@ private ParallelismAndInputInfos 
decideParallelismAndEvenlyDistributeData(
                             subpartitionRanges.size());
             if (!adjustedSubpartitionRanges.isPresent()) {
                 // can't find any legal parallelism, fall back to evenly 
distribute subpartitions
+                LOG.debug(
+                        "Cannot find a legal parallelism, fall back to evenly 
distribute subpartitions.");

Review Comment:
   -> Cannot find a legal parallelism to evenly distribute data for job vertex 
{}. Fall back to compute a parallelism that can evenly distribute subpartitions.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -134,23 +139,20 @@ private static boolean 
areAllInputsBroadcast(List<BlockingResultInfo> consumedRe
      */
     private ParallelismAndInputInfos 
decideParallelismAndEvenlyDistributeSubpartitions(
             List<BlockingResultInfo> consumedResults, int initialParallelism) {
-        int parallelism = decideParallelism(consumedResults, 
initialParallelism);
+        checkArgument(!consumedResults.isEmpty());
+        int parallelism =
+                initialParallelism > 0
+                        ? initialParallelism
+                        : decideParallelism(consumedResults, 
initialParallelism);
         return new ParallelismAndInputInfos(
                 parallelism,
-                consumedResults.isEmpty()
-                        ? Collections.emptyMap()
-                        : 
VertexInputInfoComputationUtils.computeVertexInputInfos(
-                                parallelism, consumedResults, true));
+                VertexInputInfoComputationUtils.computeVertexInputInfos(
+                        parallelism, consumedResults, true));
     }
 
     int decideParallelism(List<BlockingResultInfo> consumedResults, int 
initialParallelism) {
-        if (initialParallelism > 0) {
-            // parallelism has already been decided
-            return initialParallelism;
-        } else if (consumedResults.isEmpty()) {
-            // source job vertex
-            return defaultSourceParallelism;
-        }
+        checkArgument(initialParallelism == 
ExecutionConfig.PARALLELISM_DEFAULT);

Review Comment:
   Looks to me this param is no longer needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to