matriv commented on a change in pull request #18420:
URL: https://github.com/apache/flink/pull/18420#discussion_r788739259
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
##########
@@ -640,50 +640,75 @@ private void addEdgeInternal(
outputTag,
exchangeMode);
} else {
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- // If no partitioner was specified and the parallelism of upstream
and downstream
- // operator matches use forward partitioning, use rebalance
otherwise.
- if (partitioner == null
- && upstreamNode.getParallelism() ==
downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
+ createActualEdge(
+ upStreamVertexID,
+ downStreamVertexID,
+ typeNumber,
+ partitioner,
+ outputTag,
+ exchangeMode);
+ }
+ }
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException(
- "Forward partitioning does not allow "
- + "change of parallelism. Upstream
operation: "
- + upstreamNode
- + " parallelism: "
- + upstreamNode.getParallelism()
- + ", downstream operation: "
- + downstreamNode
- + " parallelism: "
- + downstreamNode.getParallelism()
- + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
+ private void createActualEdge(
+ Integer upStreamVertexID,
+ Integer downStreamVertexID,
+ int typeNumber,
+ StreamPartitioner<?> partitioner,
+ OutputTag outputTag,
+ StreamExchangeMode exchangeMode) {
+ StreamNode upstreamNode = getStreamNode(upStreamVertexID);
+ StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+ // If no partitioner was specified and the parallelism of upstream and
downstream
+ // operator matches use forward partitioning, use rebalance otherwise.
+ if (partitioner == null
+ && upstreamNode.getParallelism() ==
downstreamNode.getParallelism()) {
+ partitioner = new ForwardPartitioner<Object>();
+ } else if (partitioner == null) {
+ partitioner = new RebalancePartitioner<Object>();
+ }
- if (exchangeMode == null) {
- exchangeMode = StreamExchangeMode.UNDEFINED;
+ if (partitioner instanceof ForwardPartitioner) {
+ if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
+ throw new UnsupportedOperationException(
+ "Forward partitioning does not allow "
+ + "change of parallelism. Upstream operation: "
+ + upstreamNode
+ + " parallelism: "
+ + upstreamNode.getParallelism()
+ + ", downstream operation: "
+ + downstreamNode
+ + " parallelism: "
+ + downstreamNode.getParallelism()
+ + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
}
+ }
- StreamEdge edge =
- new StreamEdge(
- upstreamNode,
- downstreamNode,
- typeNumber,
- partitioner,
- outputTag,
- exchangeMode);
-
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
+ if (exchangeMode == null) {
+ exchangeMode = StreamExchangeMode.UNDEFINED;
}
+
+ /**
+ * Just make sure that {@link StreamEdge} connecting same nodes (for
example as a result of
+ * self unioning a {@link DataStream}) are distinct and unique.
Otherwise it would be
+ * difficult on the {@link StreamTask} to assign {@link RecordWriter}s
to correct {@link
+ * StreamEdge}.
+ */
+ int uniqueId = getStreamEdges(upstreamNode.getId(),
downstreamNode.getId()).size();
Review comment:
Since the `uniqueId` here is an integer, I guess it can be `int` instead
of `long` also in the `StreamEdge`.
Moreover, is the `size()` of this list enough to uniquely identify the
`StreamEdge`?
Wouldn't it be better to have an atomically incremented int instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]