zhuzhurk commented on code in PR #21772:
URL: https://github.com/apache/flink/pull/21772#discussion_r1089997741


##########
flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java:
##########
@@ -111,6 +112,8 @@
     // This is used to assign a unique ID to every Transformation
     private static final AtomicInteger ID_COUNTER = new AtomicInteger(0);
 
+    private boolean parallelismConfigured;

Review Comment:
   It's better to add some comments to  explain what it is used for. e.g. "If 
true, the parallelism of the transformation is explicitly set and should be 
respected. Otherwise the parallelism can be changed at runtime."



##########
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java:
##########
@@ -430,6 +435,7 @@ public boolean preVisit(PlanNode node) {
             // set parallelism
             int pd = node.getParallelism();
             vertex.setParallelism(pd);
+            vertex.setParallelismConfigured(node.isParallelismConfigured());

Review Comment:
   Looks to me the chained tasks(nodes) are not considered?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -878,6 +879,10 @@ private StreamConfig createJobVertex(Integer streamNodeId, 
OperatorChainInfo cha
 
         int parallelism = streamNode.getParallelism();
 
+        if (streamGraph.isDynamic() && !streamNode.isParallelismConfigured()) {
+            streamNode.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT, 
false);

Review Comment:
   Why is this needed? In other words, would the `jobVertex.setParallelism()` 
work on its own?
   What I'm concerned here is whether this change will lead to a different 
chaining result.



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionBatchOptions.java:
##########
@@ -0,0 +1,19 @@
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.docs.Documentation;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Configuration options for the batch execution. */
+public class ExecutionBatchOptions {
+
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER

Review Comment:
   I think adding it to EXPERT_SCHEDULING section is enough.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -755,6 +755,12 @@ public boolean isDynamic() {
         return dynamic;
     }
 
+    public void setParallelism(Integer vertexID, int parallelism, boolean 
parallelismConfigured) {
+        if (getStreamNode(vertexID) != null) {

Review Comment:
   vertexID -> vertexId



##########
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java:
##########
@@ -407,6 +409,14 @@ public void setParallelism(int parallelism) {
         this.parallelism = parallelism;
     }
 
+    public boolean isParallelismConfigured() {
+        return parallelismConfigured;
+    }
+
+    public void enableParallelismConfigured(boolean parallelismConfigured) {

Review Comment:
   enableParallelismConfigured -> setParallelismConfigured



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -254,6 +253,46 @@ void testEnabledUnalignedCheckAndDisabledCheckpointing() {
         assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
     }
 
+    @Test
+    public void testTransformationSetParallelism() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromSequence(1L, 3L).map(i -> 
i).setParallelism(10).print().setParallelism(20);
+        StreamGraph streamGraph = env.getStreamGraph();
+
+        // check the streamGraph parallleism configured
+        StreamNode[] streamNodes = streamGraph.getStreamNodes().toArray(new 
StreamNode[0]);
+        assertThat(streamNodes[0].isParallelismConfigured()).isFalse();
+        assertThat(streamNodes[1].isParallelismConfigured()).isTrue();
+        assertThat(streamNodes[2].isParallelismConfigured()).isTrue();
+
+        // check the jobGraph parallelism configured
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(3);
+        assertThat(vertices.get(0).isParallelismConfigured()).isFalse();
+        assertThat(vertices.get(1).isParallelismConfigured()).isTrue();
+        assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
+    }
+

Review Comment:
   It's better to add a test to check that for dynamic graph, the vertex 
parallelism is set to -1, while for non-dynamic graph, the parallelism is the 
default parallelism.



##########
flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java:
##########
@@ -183,11 +186,31 @@ public static int getNewNodeId() {
      * @param parallelism The parallelism of this {@code Transformation}
      */
     public Transformation(String name, TypeInformation<T> outputType, int 
parallelism) {
+        this(name, outputType, parallelism, true);
+    }
+
+    /**
+     * Creates a new {@code Transformation} with the given name, output type 
and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown 
in Visualizations and
+     *     the Log
+     * @param outputType The output type of this {@code Transformation}
+     * @param parallelism The parallelism of this {@code Transformation}
+     * @param parallelismConfigured if true, the parallelism of transformation 
is not use default
+     *     parallelism

Review Comment:
   Maybe "If true, the parallelism of the transformation is explicitly set and 
should be respected. Otherwise the parallelism can be changed at runtime."



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java:
##########
@@ -168,6 +169,17 @@ public SchedulerNG createInstance(
                 new VertexwiseSchedulingStrategy.Factory(
                         
loadInputConsumableDeciderFactory(hybridPartitionDataConsumeConstraint));
 
+        int defaultMaxParallelism =
+                jobMasterConfiguration
+                        
.getOptional(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM)
+                        .orElse(
+                                jobMasterConfiguration
+                                        
.getOptional(CoreOptions.DEFAULT_PARALLELISM)
+                                        .orElse(
+                                                
jobMasterConfiguration.getInteger(
+                                                        JobManagerOptions
+                                                                
.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM)));

Review Comment:
   It's more clear to directly use 
`ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue()` here.



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionBatchOptions.java:
##########
@@ -0,0 +1,19 @@
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.docs.Documentation;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Configuration options for the batch execution. */

Review Comment:
   the batch execution -> batch job execution



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java:
##########
@@ -191,7 +193,12 @@ public int getParallelism() {
     }
 
     public void setParallelism(Integer parallelism) {
+        setParallelism(parallelism, true);
+    }
+
+    void setParallelism(Integer parallelism, boolean parallelismConfigured) {
         this.parallelism = parallelism;
+        this.parallelismConfigured = parallelismConfigured;

Review Comment:
   if `parallelism < 0`, `parallelismConfigured` should be false.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1861,10 +1880,16 @@ private OperatorID addNodeToChain(int currentNodeId, 
String operatorName) {
                 operatorHashes.add(new Tuple2<>(primaryHashBytes, 
legacyHash.get(currentNodeId)));
             }
 
-            streamGraph
-                    .getStreamNode(currentNodeId)
+            streamNode
                     .getCoordinatorProvider(operatorName, new 
OperatorID(getHash(currentNodeId)))
                     .map(coordinatorProviders::add);
+
+            if (streamGraph.isDynamic()
+                    && !streamNode.isParallelismConfigured()
+                    && startNodeId != currentNodeId) {
+                streamNode.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT, 
false);

Review Comment:
   Why do we need this change?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -141,6 +141,8 @@ public class StreamGraph implements Pipeline {
 
     private boolean dynamic;
 
+    private boolean autoParallelism;

Review Comment:
   autoParallelism -> autoParallelismEnabled



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionBatchOptions.java:
##########
@@ -0,0 +1,19 @@
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.docs.Documentation;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Configuration options for the batch execution. */
+public class ExecutionBatchOptions {

Review Comment:
   I prefer to name it as `BatchExecutionOptions`.



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionBatchOptions.java:
##########
@@ -0,0 +1,19 @@
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.docs.Documentation;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Configuration options for the batch execution. */
+public class ExecutionBatchOptions {
+
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Boolean> 
ADAPTIVE_AUTO_PARALLELISM_ENABLED =
+            key("execution.batch.adaptive.auto-parallelism.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Controls whether to enable auto 
parallelism derivation.");

Review Comment:
   ->  If true, Flink will automatically decide the parallelism of operators in 
batch jobs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to