Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4649#discussion_r161797181
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
---
@@ -411,6 +413,13 @@ private void addEdgeInternal(Integer upStreamVertexID,
StreamNode upstreamNode =
getStreamNode(upStreamVertexID);
StreamNode downstreamNode =
getStreamNode(downStreamVertexID);
+ Tuple2<Integer, Integer> edgePair = new
Tuple2<>(upstreamNode.getId(), downstreamNode.getId());
+ if (!uniqueEdgeMap.containsKey(edgePair)) {
+ uniqueEdgeMap.put(edgePair, 1);
+ } else {
+ uniqueEdgeMap.put(edgePair,
uniqueEdgeMap.get(edgePair) + 1);
--- End diff --
maybe wrap all of this new code into function
```
int edgeSubId = generateUniqueEdgeSubId(edgePair);
(...)
StreamEdge edge = new StreamEdge(..., edgeSubId);
```
---