pnowojski commented on a change in pull request #18420:
URL: https://github.com/apache/flink/pull/18420#discussion_r788754806



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
##########
@@ -640,50 +640,75 @@ private void addEdgeInternal(
                     outputTag,
                     exchangeMode);
         } else {
-            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
-            StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
-            // If no partitioner was specified and the parallelism of upstream 
and downstream
-            // operator matches use forward partitioning, use rebalance 
otherwise.
-            if (partitioner == null
-                    && upstreamNode.getParallelism() == 
downstreamNode.getParallelism()) {
-                partitioner = new ForwardPartitioner<Object>();
-            } else if (partitioner == null) {
-                partitioner = new RebalancePartitioner<Object>();
-            }
+            createActualEdge(
+                    upStreamVertexID,
+                    downStreamVertexID,
+                    typeNumber,
+                    partitioner,
+                    outputTag,
+                    exchangeMode);
+        }
+    }
 
-            if (partitioner instanceof ForwardPartitioner) {
-                if (upstreamNode.getParallelism() != 
downstreamNode.getParallelism()) {
-                    throw new UnsupportedOperationException(
-                            "Forward partitioning does not allow "
-                                    + "change of parallelism. Upstream 
operation: "
-                                    + upstreamNode
-                                    + " parallelism: "
-                                    + upstreamNode.getParallelism()
-                                    + ", downstream operation: "
-                                    + downstreamNode
-                                    + " parallelism: "
-                                    + downstreamNode.getParallelism()
-                                    + " You must use another partitioning 
strategy, such as broadcast, rebalance, shuffle or global.");
-                }
-            }
+    private void createActualEdge(
+            Integer upStreamVertexID,
+            Integer downStreamVertexID,
+            int typeNumber,
+            StreamPartitioner<?> partitioner,
+            OutputTag outputTag,
+            StreamExchangeMode exchangeMode) {
+        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
+        StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+        // If no partitioner was specified and the parallelism of upstream and 
downstream
+        // operator matches use forward partitioning, use rebalance otherwise.
+        if (partitioner == null
+                && upstreamNode.getParallelism() == 
downstreamNode.getParallelism()) {
+            partitioner = new ForwardPartitioner<Object>();
+        } else if (partitioner == null) {
+            partitioner = new RebalancePartitioner<Object>();
+        }
 
-            if (exchangeMode == null) {
-                exchangeMode = StreamExchangeMode.UNDEFINED;
+        if (partitioner instanceof ForwardPartitioner) {
+            if (upstreamNode.getParallelism() != 
downstreamNode.getParallelism()) {
+                throw new UnsupportedOperationException(
+                        "Forward partitioning does not allow "
+                                + "change of parallelism. Upstream operation: "
+                                + upstreamNode
+                                + " parallelism: "
+                                + upstreamNode.getParallelism()
+                                + ", downstream operation: "
+                                + downstreamNode
+                                + " parallelism: "
+                                + downstreamNode.getParallelism()
+                                + " You must use another partitioning 
strategy, such as broadcast, rebalance, shuffle or global.");
             }
+        }
 
-            StreamEdge edge =
-                    new StreamEdge(
-                            upstreamNode,
-                            downstreamNode,
-                            typeNumber,
-                            partitioner,
-                            outputTag,
-                            exchangeMode);
-
-            getStreamNode(edge.getSourceId()).addOutEdge(edge);
-            getStreamNode(edge.getTargetId()).addInEdge(edge);
+        if (exchangeMode == null) {
+            exchangeMode = StreamExchangeMode.UNDEFINED;
         }
+
+        /**
+         * Just make sure that {@link StreamEdge} connecting same nodes (for 
example as a result of
+         * self unioning a {@link DataStream}) are distinct and unique. 
Otherwise it would be
+         * difficult on the {@link StreamTask} to assign {@link RecordWriter}s 
to correct {@link
+         * StreamEdge}.
+         */
+        int uniqueId = getStreamEdges(upstreamNode.getId(), 
downstreamNode.getId()).size();

Review comment:
       > is the size() of this list enough to uniquely identify the StreamEdge?
   > Wouldn't it be better to have an atomically incremented int instead?
   
   I've explained what type of uniqueness we need in the java doc of the field. 
`size()` is not enough to uniquely identify `StreamEdge`, but combined with 
`sourceId` and `targetId` it is. Regarding why no global incremented integer, I 
thought about it, but I wanted to have more self explanatory code, why do we 
need this uniqueId. That the tuple `sourceId`, `targetId`, + partitioning 
scheme, is not enough to uniquely distinguish edges. By using
   ```
   getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();
   
   ```
   I'm ensuring almost exactly this (% partitioning schemes). That if there is 
more then one edge connecting `sourceId` and `targetId`, they will have 
different `uniqueId`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to