zhuzhurk commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1869800542
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1678,10 +1678,27 @@ public static ResultPartitionType
determineUndefinedResultPartitionType(
public static boolean isChainable(StreamEdge edge, StreamGraph
streamGraph) {
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
- return downStreamVertex.getInEdges().size() == 1 &&
isChainableInput(edge, streamGraph);
+ return downStreamVertex.getInEdges().size() == 1
Review Comment:
Looks to me these changes to the `StreamingJobGraphGenerator` should happen
in commit/fix-commit "Further Improve Method Reusability in
StreamingJobGraphGenerator"?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java:
##########
@@ -1266,6 +1335,41 @@ public void cancel() {}
return env.getStreamGraph();
}
+ public static StreamGraph
createStreamGraphForUnknownResourceSpecManagedMemoryFractionTest(
Review Comment:
Looks to me this method is not used?
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1418,6 +1419,80 @@ public void deserializeUserDefinedInstances(
this.userDefinedObjectsHolder.deserialize(userClassLoader,
serializationExecutor);
}
+ //
--------------------------------------------------------------------------------------------
+ // Topological Graph Access
+ //
--------------------------------------------------------------------------------------------
+
+ public List<StreamNode> getStreamNodesSortedTopologicallyFromSources()
+ throws InvalidProgramException {
+ // early out on empty lists
+ if (this.streamNodes.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<StreamNode> sorted = new ArrayList<>(streamNodes.size());
+ Set<StreamNode> remaining = new LinkedHashSet<>(streamNodes.values());
+
+ // start by source nodes
+ {
Review Comment:
This kind of pure bracket-block should be avoided.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java:
##########
@@ -227,10 +236,21 @@ public Integer
getProducerStreamNodeId(IntermediateDataSetID intermediateDataSet
return intermediateDataSetIdToProducerMap.get(intermediateDataSetID);
}
+ /**
+ * Retrieves the stream edges that subscribe to the IntermediateDataSet.
+ *
+ * @param intermediateDataSetID the ID of the IntermediateDataSet
+ * @return the stream edges that subscribe to the IntermediateDataSet
+ */
+ public List<StreamEdge> getOutputStreamEdges(IntermediateDataSetID
intermediateDataSetID) {
Review Comment:
intermediateDataSetID -> intermediateDataSetId
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1678,10 +1678,27 @@ public static ResultPartitionType
determineUndefinedResultPartitionType(
public static boolean isChainable(StreamEdge edge, StreamGraph
streamGraph) {
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
- return downStreamVertex.getInEdges().size() == 1 &&
isChainableInput(edge, streamGraph);
+ return downStreamVertex.getInEdges().size() == 1
+ && isChainableInput(edge, streamGraph)
+ && checkAndSetParallelismForChainableNodes(edge, streamGraph);
Review Comment:
It's a bit weird to update node parallelism/maxParallelism in a method named
`isChainable()`.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1418,6 +1419,80 @@ public void deserializeUserDefinedInstances(
this.userDefinedObjectsHolder.deserialize(userClassLoader,
serializationExecutor);
}
+ //
--------------------------------------------------------------------------------------------
+ // Topological Graph Access
+ //
--------------------------------------------------------------------------------------------
+
+ public List<StreamNode> getStreamNodesSortedTopologicallyFromSources()
Review Comment:
Unit tests should be added against this method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]