zhuzhurk commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1869800542


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1678,10 +1678,27 @@ public static ResultPartitionType 
determineUndefinedResultPartitionType(
     public static boolean isChainable(StreamEdge edge, StreamGraph 
streamGraph) {
         StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
-        return downStreamVertex.getInEdges().size() == 1 && 
isChainableInput(edge, streamGraph);
+        return downStreamVertex.getInEdges().size() == 1

Review Comment:
   Looks to me these changes to the `StreamingJobGraphGenerator` should happen 
in commit/fix-commit "Further Improve Method Reusability in 
StreamingJobGraphGenerator"?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java:
##########
@@ -1266,6 +1335,41 @@ public void cancel() {}
         return env.getStreamGraph();
     }
 
+    public static StreamGraph 
createStreamGraphForUnknownResourceSpecManagedMemoryFractionTest(

Review Comment:
   Looks to me this method is not used?



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1418,6 +1419,80 @@ public void deserializeUserDefinedInstances(
         this.userDefinedObjectsHolder.deserialize(userClassLoader, 
serializationExecutor);
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    //  Topological Graph Access
+    // 
--------------------------------------------------------------------------------------------
+
+    public List<StreamNode> getStreamNodesSortedTopologicallyFromSources()
+            throws InvalidProgramException {
+        // early out on empty lists
+        if (this.streamNodes.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<StreamNode> sorted = new ArrayList<>(streamNodes.size());
+        Set<StreamNode> remaining = new LinkedHashSet<>(streamNodes.values());
+
+        // start by source nodes
+        {

Review Comment:
   This kind of pure bracket-block should be avoided.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java:
##########
@@ -227,10 +236,21 @@ public Integer 
getProducerStreamNodeId(IntermediateDataSetID intermediateDataSet
         return intermediateDataSetIdToProducerMap.get(intermediateDataSetID);
     }
 
+    /**
+     * Retrieves the stream edges that subscribe to the IntermediateDataSet.
+     *
+     * @param intermediateDataSetID the ID of the IntermediateDataSet
+     * @return the stream edges that subscribe to the IntermediateDataSet
+     */
+    public List<StreamEdge> getOutputStreamEdges(IntermediateDataSetID 
intermediateDataSetID) {

Review Comment:
   intermediateDataSetID -> intermediateDataSetId



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1678,10 +1678,27 @@ public static ResultPartitionType 
determineUndefinedResultPartitionType(
     public static boolean isChainable(StreamEdge edge, StreamGraph 
streamGraph) {
         StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
-        return downStreamVertex.getInEdges().size() == 1 && 
isChainableInput(edge, streamGraph);
+        return downStreamVertex.getInEdges().size() == 1
+                && isChainableInput(edge, streamGraph)
+                && checkAndSetParallelismForChainableNodes(edge, streamGraph);

Review Comment:
   It's a bit weird to update node parallelism/maxParallelism in a method named 
`isChainable()`.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1418,6 +1419,80 @@ public void deserializeUserDefinedInstances(
         this.userDefinedObjectsHolder.deserialize(userClassLoader, 
serializationExecutor);
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    //  Topological Graph Access
+    // 
--------------------------------------------------------------------------------------------
+
+    public List<StreamNode> getStreamNodesSortedTopologicallyFromSources()

Review Comment:
   Unit tests should be added against this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to