pnowojski commented on a change in pull request #18420:
URL: https://github.com/apache/flink/pull/18420#discussion_r788754806
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
##########
@@ -640,50 +640,75 @@ private void addEdgeInternal(
outputTag,
exchangeMode);
} else {
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- // If no partitioner was specified and the parallelism of upstream
and downstream
- // operator matches use forward partitioning, use rebalance
otherwise.
- if (partitioner == null
- && upstreamNode.getParallelism() ==
downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
+ createActualEdge(
+ upStreamVertexID,
+ downStreamVertexID,
+ typeNumber,
+ partitioner,
+ outputTag,
+ exchangeMode);
+ }
+ }
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException(
- "Forward partitioning does not allow "
- + "change of parallelism. Upstream
operation: "
- + upstreamNode
- + " parallelism: "
- + upstreamNode.getParallelism()
- + ", downstream operation: "
- + downstreamNode
- + " parallelism: "
- + downstreamNode.getParallelism()
- + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
+ private void createActualEdge(
+ Integer upStreamVertexID,
+ Integer downStreamVertexID,
+ int typeNumber,
+ StreamPartitioner<?> partitioner,
+ OutputTag outputTag,
+ StreamExchangeMode exchangeMode) {
+ StreamNode upstreamNode = getStreamNode(upStreamVertexID);
+ StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+ // If no partitioner was specified and the parallelism of upstream and
downstream
+ // operator matches use forward partitioning, use rebalance otherwise.
+ if (partitioner == null
+ && upstreamNode.getParallelism() ==
downstreamNode.getParallelism()) {
+ partitioner = new ForwardPartitioner<Object>();
+ } else if (partitioner == null) {
+ partitioner = new RebalancePartitioner<Object>();
+ }
- if (exchangeMode == null) {
- exchangeMode = StreamExchangeMode.UNDEFINED;
+ if (partitioner instanceof ForwardPartitioner) {
+ if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
+ throw new UnsupportedOperationException(
+ "Forward partitioning does not allow "
+ + "change of parallelism. Upstream operation: "
+ + upstreamNode
+ + " parallelism: "
+ + upstreamNode.getParallelism()
+ + ", downstream operation: "
+ + downstreamNode
+ + " parallelism: "
+ + downstreamNode.getParallelism()
+ + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
}
+ }
- StreamEdge edge =
- new StreamEdge(
- upstreamNode,
- downstreamNode,
- typeNumber,
- partitioner,
- outputTag,
- exchangeMode);
-
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
+ if (exchangeMode == null) {
+ exchangeMode = StreamExchangeMode.UNDEFINED;
}
+
+ /**
+ * Just make sure that {@link StreamEdge} connecting same nodes (for
example as a result of
+ * self unioning a {@link DataStream}) are distinct and unique.
Otherwise it would be
+ * difficult on the {@link StreamTask} to assign {@link RecordWriter}s
to correct {@link
+ * StreamEdge}.
+ */
+ int uniqueId = getStreamEdges(upstreamNode.getId(),
downstreamNode.getId()).size();
Review comment:
> is the size() of this list enough to uniquely identify the StreamEdge?
> Wouldn't it be better to have an atomically incremented int instead?
I've explained what type of uniqueness we need in the java doc of the field.
`size()` is not enough to uniquely identify `StreamEdge`, but combined with
`sourceId` and `targetId` it is. Regarding why no global incremented integer, I
thought about it, but I wanted to have more self explanatory code, why do we
need this uniqueId. That the tuple `sourceId`, `targetId`, + partitioning
scheme, is not enough to uniquely distinguish edges. By using
```
getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();
```
I'm ensuring almost exactly this (% partitioning schemes). That if there is
more then one edge connecting `sourceId` and `targetId`, they will have
different `uniqueId`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]