zhuzhurk commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1115923261


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -928,14 +1011,8 @@ private StreamConfig createJobVertex(Integer 
streamNodeId, OperatorChainInfo cha
         return new StreamConfig(jobVertex.getConfiguration());
     }
 
-    private void setVertexConfig(
-            Integer vertexID,
-            StreamConfig config,
-            List<StreamEdge> chainableOutputs,
-            List<StreamEdge> nonChainableOutputs,
-            Map<Integer, ChainedSourceInfo> chainedSources) {
-
-        tryConvertPartitionerForDynamicGraph(chainableOutputs, 
nonChainableOutputs);
+    private void setOperatorConfig(
+            Integer vertexID, StreamConfig config, Map<Integer, 
ChainedSourceInfo> chainedSources) {

Review Comment:
   vertexID -> vertexId



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroup.java:
##########
@@ -53,9 +55,22 @@ public ForwardGroup(final Set<JobVertex> jobVertices) {
                         .map(JobVertex::getParallelism)
                         .collect(Collectors.toSet());
 
-        checkState(decidedParallelisms.size() <= 1);
-        if (decidedParallelisms.size() == 1) {
-            this.parallelism = decidedParallelisms.iterator().next();
+        checkState(configuredParallelisms.size() <= 1);
+        if (configuredParallelisms.size() == 1) {
+            this.parallelism = configuredParallelisms.iterator().next();
+        }
+
+        Set<Integer> configuredMaxParallelisms =
+                jobVertices.stream()
+                        .map(JobVertex::getMaxParallelism)
+                        .filter(val -> val > 0)
+                        .collect(Collectors.toSet());
+
+        if (!configuredMaxParallelisms.isEmpty()) {
+            this.maxParallelism = Collections.min(configuredMaxParallelisms);
+            checkState(
+                    maxParallelism >= parallelism,

Review Comment:
   The `parallelism` can be -1, while the `maxParallelism` is configured.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -758,6 +771,76 @@ private List<StreamEdge> createChain(
         }
     }
 
+    private void setVertexParallelismsForDynamicGraphIfNecessary() {

Review Comment:
   Better to add some comments to explain what this method is for, e.g. ensures 
the parallelism and maxParallelism of vertices in the same forward group to be 
the same; set the parallelism at early stage if possible, to avoid invalid 
partition reuse.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1955,6 +2091,14 @@ private OperatorID addNodeToChain(int currentNodeId, 
String operatorName) {
             return new OperatorID(primaryHashBytes);
         }
 
+        private void setTransitiveOutEdges(final List<StreamEdge> 
transitiveOutEdges) {
+            this.transitiveOutEdges.addAll(transitiveOutEdges);
+        }
+
+        public List<StreamEdge> getTransitiveOutEdges() {

Review Comment:
   ```suggestion
           private List<StreamEdge> getTransitiveOutEdges() {
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -568,12 +592,7 @@ && isChainableInput(sourceOutEdge, streamGraph)) {
                     final StreamConfig.SourceInputConfig inputConfig =
                             new StreamConfig.SourceInputConfig(sourceOutEdge);
                     final StreamConfig operatorConfig = new StreamConfig(new 
Configuration());
-                    setVertexConfig(
-                            sourceNodeId,
-                            operatorConfig,
-                            Collections.emptyList(),
-                            Collections.emptyList(),
-                            Collections.emptyMap());
+                    setOperatorConfig(sourceNodeId, operatorConfig, 
Collections.emptyMap());

Review Comment:
   IIUC, `setChainedOutputs(emptyList)` would be called here previously, but is 
no longer called after this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to