matriv commented on a change in pull request #18420:
URL: https://github.com/apache/flink/pull/18420#discussion_r788769178
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
##########
@@ -640,50 +640,75 @@ private void addEdgeInternal(
outputTag,
exchangeMode);
} else {
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- // If no partitioner was specified and the parallelism of upstream
and downstream
- // operator matches use forward partitioning, use rebalance
otherwise.
- if (partitioner == null
- && upstreamNode.getParallelism() ==
downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
+ createActualEdge(
+ upStreamVertexID,
+ downStreamVertexID,
+ typeNumber,
+ partitioner,
+ outputTag,
+ exchangeMode);
+ }
+ }
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException(
- "Forward partitioning does not allow "
- + "change of parallelism. Upstream
operation: "
- + upstreamNode
- + " parallelism: "
- + upstreamNode.getParallelism()
- + ", downstream operation: "
- + downstreamNode
- + " parallelism: "
- + downstreamNode.getParallelism()
- + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
+ private void createActualEdge(
+ Integer upStreamVertexID,
+ Integer downStreamVertexID,
+ int typeNumber,
+ StreamPartitioner<?> partitioner,
+ OutputTag outputTag,
+ StreamExchangeMode exchangeMode) {
+ StreamNode upstreamNode = getStreamNode(upStreamVertexID);
+ StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+ // If no partitioner was specified and the parallelism of upstream and
downstream
+ // operator matches use forward partitioning, use rebalance otherwise.
+ if (partitioner == null
+ && upstreamNode.getParallelism() ==
downstreamNode.getParallelism()) {
+ partitioner = new ForwardPartitioner<Object>();
+ } else if (partitioner == null) {
+ partitioner = new RebalancePartitioner<Object>();
+ }
- if (exchangeMode == null) {
- exchangeMode = StreamExchangeMode.UNDEFINED;
+ if (partitioner instanceof ForwardPartitioner) {
+ if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
+ throw new UnsupportedOperationException(
+ "Forward partitioning does not allow "
+ + "change of parallelism. Upstream operation: "
+ + upstreamNode
+ + " parallelism: "
+ + upstreamNode.getParallelism()
+ + ", downstream operation: "
+ + downstreamNode
+ + " parallelism: "
+ + downstreamNode.getParallelism()
+ + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
}
+ }
- StreamEdge edge =
- new StreamEdge(
- upstreamNode,
- downstreamNode,
- typeNumber,
- partitioner,
- outputTag,
- exchangeMode);
-
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
+ if (exchangeMode == null) {
+ exchangeMode = StreamExchangeMode.UNDEFINED;
}
+
+ /**
+ * Just make sure that {@link StreamEdge} connecting same nodes (for
example as a result of
+ * self unioning a {@link DataStream}) are distinct and unique.
Otherwise it would be
+ * difficult on the {@link StreamTask} to assign {@link RecordWriter}s
to correct {@link
+ * StreamEdge}.
+ */
+ int uniqueId = getStreamEdges(upstreamNode.getId(),
downstreamNode.getId()).size();
Review comment:
Thank you @pnowojski ! I understood the concept (and it's well explained
in the javadoc), was a bit confused with the `size()` here, but yeah, on every
call to size() there is already one more `StreamEdge` in the list so the
`size()` increases, got it!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]