JunRuiLee commented on code in PR #22555:
URL: https://github.com/apache/flink/pull/22555#discussion_r1189846990


##########
flink-core/src/main/java/org/apache/flink/configuration/BatchExecutionOptions.java:
##########
@@ -99,7 +99,7 @@ public class BatchExecutionOptions {
     public static final ConfigOption<Integer> 
ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM =
             
key("execution.batch.adaptive.auto-parallelism.default-source-parallelism")
                     .intType()
-                    .defaultValue(1)
+                    .defaultValue(0)
                     .withDeprecatedKeys(

Review Comment:
   Why do you need to change the default value? I think a better way is through 
 
`configuration.getOptional(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM))`
   to determine whether the configuration item has been configured



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -84,21 +85,28 @@ private DefaultVertexParallelismAndInputInfosDecider(
             int globalMaxParallelism,
             int globalMinParallelism,
             MemorySize dataVolumePerTask,
-            int globalDefaultSourceParallelism) {
+            int globalDefaultSourceParallelism,
+            int defaultExecutionParallelism) {
 
         checkArgument(globalMinParallelism > 0, "The minimum parallelism must 
be larger than 0.");
         checkArgument(
                 globalMaxParallelism >= globalMinParallelism,
                 "Maximum parallelism should be greater than or equal to the 
minimum parallelism.");
         checkArgument(
-                globalDefaultSourceParallelism > 0,
-                "The default source parallelism must be larger than 0.");
+                globalDefaultSourceParallelism >= 0,
+                "The default source parallelism must be greater than or equal 
to 0.");
         checkNotNull(dataVolumePerTask);
+        checkArgument(
+                defaultExecutionParallelism > 0,
+                "The default execution parallelism must be larger than 0.");
 

Review Comment:
   We do not require that the default parallelism must be greater than 0



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -546,7 +553,7 @@ static DefaultVertexParallelismAndInputInfosDecider from(
                 configuration.get(
                         
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
                 configuration.get(
-                        BatchExecutionOptions
-                                
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM));
+                        
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM),
+                configuration.get(CoreOptions.DEFAULT_PARALLELISM));
     }

Review Comment:
   parallelism.default should be obtained from ExecutionConfig instead of 
jobmaster Configuration



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -84,21 +85,28 @@ private DefaultVertexParallelismAndInputInfosDecider(
             int globalMaxParallelism,
             int globalMinParallelism,
             MemorySize dataVolumePerTask,
-            int globalDefaultSourceParallelism) {
+            int globalDefaultSourceParallelism,
+            int defaultExecutionParallelism) {
 

Review Comment:
   Maybe we can determine the value of source parallelism externally without 
touching this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to