This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 9c1bf6c  [FLINK-25199][network] Make sure StreamEdges are unique
9c1bf6c is described below

commit 9c1bf6cdb3a2ea9d1b7f2b7d8e687a003ce9d919
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Jan 20 09:31:24 2022 +0100

    [FLINK-25199][network] Make sure StreamEdges are unique
    
    Previously, if there was a node that was self-unioned with itself,
    it was creating a situation with two identical StreamEdges. Both
    with the same partitioning, from the same source node to the same
    target node.
    
    This was causing issues when constructing output collectors and
    picking the correct RecordWriters, as StreamTask was not able to
    uniquely identify given StreamEdge and was assigning the same
    RecordWriter to both of the edges. As a result all stream elements
    were sent twice through the same RecordWriter. It was actually pretty
    harmless apart of calculating the combined watermark downstream,
    since all watermarks were always comming just from one single
    edge/inputgate, and the unused edges were always stuck with
    min watermark.
    
    As a solution we are making sure that StreamEdges are unique
    by introducing a uniqueId field, incremented for every pair
    of StreamEdges connecting the same node.
---
 .../flink/streaming/api/graph/StreamEdge.java      |  34 ++++++-
 .../flink/streaming/api/graph/StreamGraph.java     | 103 +++++++++++++--------
 .../flink/streaming/api/graph/StreamNode.java      |  11 +++
 .../test/streaming/runtime/TimestampITCase.java    |  19 ++++
 .../test/savepoint-921b4e-06351b8d4134/_metadata   | Bin 0 -> 198 bytes
 5 files changed, 123 insertions(+), 44 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index f76edd5..2ea5671 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -44,6 +44,15 @@ public class StreamEdge implements Serializable {
     private final int sourceId;
     private final int targetId;
 
+    /**
+     * Note that this field doesn't have to be unique among all {@link 
StreamEdge}s. It's enough if
+     * this field ensures that all logical instances of {@link StreamEdge} are 
unique, and {@link
+     * #hashCode()} are different and {@link #equals(Object)} returns false, 
for every possible pair
+     * of {@link StreamEdge}. Especially among two different {@link 
StreamEdge}s that are connecting
+     * the same pair of nodes.
+     */
+    private final int uniqueId;
+
     /** The type number of the input for co-tasks. */
     private final int typeNumber;
     /** The side-output tag (if any) of this {@link StreamEdge}. */
@@ -78,7 +87,8 @@ public class StreamEdge implements Serializable {
                 ALWAYS_FLUSH_BUFFER_TIMEOUT,
                 outputPartitioner,
                 outputTag,
-                ShuffleMode.UNDEFINED);
+                ShuffleMode.UNDEFINED,
+                0);
     }
 
     public StreamEdge(
@@ -87,7 +97,8 @@ public class StreamEdge implements Serializable {
             int typeNumber,
             StreamPartitioner<?> outputPartitioner,
             OutputTag outputTag,
-            ShuffleMode shuffleMode) {
+            ShuffleMode shuffleMode,
+            int uniqueId) {
 
         this(
                 sourceVertex,
@@ -96,7 +107,8 @@ public class StreamEdge implements Serializable {
                 sourceVertex.getBufferTimeout(),
                 outputPartitioner,
                 outputTag,
-                shuffleMode);
+                shuffleMode,
+                uniqueId);
     }
 
     public StreamEdge(
@@ -106,10 +118,12 @@ public class StreamEdge implements Serializable {
             long bufferTimeout,
             StreamPartitioner<?> outputPartitioner,
             OutputTag outputTag,
-            ShuffleMode shuffleMode) {
+            ShuffleMode shuffleMode,
+            int uniqueId) {
 
         this.sourceId = sourceVertex.getId();
         this.targetId = targetVertex.getId();
+        this.uniqueId = uniqueId;
         this.typeNumber = typeNumber;
         this.bufferTimeout = bufferTimeout;
         this.outputPartitioner = outputPartitioner;
@@ -118,7 +132,15 @@ public class StreamEdge implements Serializable {
         this.targetOperatorName = targetVertex.getOperatorName();
         this.shuffleMode = checkNotNull(shuffleMode);
         this.edgeId =
-                sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + 
outputPartitioner;
+                sourceVertex
+                        + "_"
+                        + targetVertex
+                        + "_"
+                        + typeNumber
+                        + "_"
+                        + outputPartitioner
+                        + "_"
+                        + uniqueId;
     }
 
     public int getSourceId() {
@@ -198,6 +220,8 @@ public class StreamEdge implements Serializable {
                 + bufferTimeout
                 + ", outputTag="
                 + outputTag
+                + ", uniqueId="
+                + uniqueId
                 + ')';
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7b39b19e..d2ecac1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -614,50 +614,75 @@ public class StreamGraph implements Pipeline {
                     outputTag,
                     shuffleMode);
         } else {
-            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
-            StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
-            // If no partitioner was specified and the parallelism of upstream 
and downstream
-            // operator matches use forward partitioning, use rebalance 
otherwise.
-            if (partitioner == null
-                    && upstreamNode.getParallelism() == 
downstreamNode.getParallelism()) {
-                partitioner = new ForwardPartitioner<Object>();
-            } else if (partitioner == null) {
-                partitioner = new RebalancePartitioner<Object>();
-            }
+            createActualEdge(
+                    upStreamVertexID,
+                    downStreamVertexID,
+                    typeNumber,
+                    partitioner,
+                    outputTag,
+                    shuffleMode);
+        }
+    }
 
-            if (partitioner instanceof ForwardPartitioner) {
-                if (upstreamNode.getParallelism() != 
downstreamNode.getParallelism()) {
-                    throw new UnsupportedOperationException(
-                            "Forward partitioning does not allow "
-                                    + "change of parallelism. Upstream 
operation: "
-                                    + upstreamNode
-                                    + " parallelism: "
-                                    + upstreamNode.getParallelism()
-                                    + ", downstream operation: "
-                                    + downstreamNode
-                                    + " parallelism: "
-                                    + downstreamNode.getParallelism()
-                                    + " You must use another partitioning 
strategy, such as broadcast, rebalance, shuffle or global.");
-                }
-            }
+    private void createActualEdge(
+            Integer upStreamVertexID,
+            Integer downStreamVertexID,
+            int typeNumber,
+            StreamPartitioner<?> partitioner,
+            OutputTag outputTag,
+            ShuffleMode shuffleMode) {
+        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
+        StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+        // If no partitioner was specified and the parallelism of upstream and 
downstream
+        // operator matches use forward partitioning, use rebalance otherwise.
+        if (partitioner == null
+                && upstreamNode.getParallelism() == 
downstreamNode.getParallelism()) {
+            partitioner = new ForwardPartitioner<Object>();
+        } else if (partitioner == null) {
+            partitioner = new RebalancePartitioner<Object>();
+        }
 
-            if (shuffleMode == null) {
-                shuffleMode = ShuffleMode.UNDEFINED;
+        if (partitioner instanceof ForwardPartitioner) {
+            if (upstreamNode.getParallelism() != 
downstreamNode.getParallelism()) {
+                throw new UnsupportedOperationException(
+                        "Forward partitioning does not allow "
+                                + "change of parallelism. Upstream operation: "
+                                + upstreamNode
+                                + " parallelism: "
+                                + upstreamNode.getParallelism()
+                                + ", downstream operation: "
+                                + downstreamNode
+                                + " parallelism: "
+                                + downstreamNode.getParallelism()
+                                + " You must use another partitioning 
strategy, such as broadcast, rebalance, shuffle or global.");
             }
+        }
 
-            StreamEdge edge =
-                    new StreamEdge(
-                            upstreamNode,
-                            downstreamNode,
-                            typeNumber,
-                            partitioner,
-                            outputTag,
-                            shuffleMode);
-
-            getStreamNode(edge.getSourceId()).addOutEdge(edge);
-            getStreamNode(edge.getTargetId()).addInEdge(edge);
+        if (shuffleMode == null) {
+            shuffleMode = ShuffleMode.UNDEFINED;
         }
+
+        /**
+         * Just make sure that {@link StreamEdge} connecting same nodes (for 
example as a result of
+         * self unioning a {@link DataStream}) are distinct and unique. 
Otherwise it would be
+         * difficult on the {@link StreamTask} to assign {@link RecordWriter}s 
to correct {@link
+         * StreamEdge}.
+         */
+        int uniqueId = getStreamEdges(upstreamNode.getId(), 
downstreamNode.getId()).size();
+
+        StreamEdge edge =
+                new StreamEdge(
+                        upstreamNode,
+                        downstreamNode,
+                        typeNumber,
+                        partitioner,
+                        outputTag,
+                    shuffleMode,
+                        uniqueId);
+
+        getStreamNode(edge.getSourceId()).addOutEdge(edge);
+        getStreamNode(edge.getTargetId()).addInEdge(edge);
     }
 
     public void setParallelism(Integer vertexID, int parallelism) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 151a4b7..06cbd63 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -45,6 +45,7 @@ import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** Class representing the operators in the streaming programs, with all their 
properties. */
 @Internal
@@ -120,6 +121,11 @@ public class StreamNode {
     }
 
     public void addInEdge(StreamEdge inEdge) {
+        checkState(
+                outEdges.stream().noneMatch(inEdge::equals),
+                "Adding not unique edge = %s to existing outEdges = %s",
+                inEdge,
+                inEdges);
         if (inEdge.getTargetId() != getId()) {
             throw new IllegalArgumentException("Destination id doesn't match 
the StreamNode id");
         } else {
@@ -128,6 +134,11 @@ public class StreamNode {
     }
 
     public void addOutEdge(StreamEdge outEdge) {
+        checkState(
+                outEdges.stream().noneMatch(outEdge::equals),
+                "Adding not unique edge = %s to existing outEdges = %s",
+                outEdge,
+                outEdges);
         if (outEdge.getSourceId() != getId()) {
             throw new IllegalArgumentException("Source id doesn't match the 
StreamNode id");
         } else {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 1dc9413..d49b129 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -160,6 +160,25 @@ public class TimestampITCase extends TestLogger {
     }
 
     @Test
+    public void testSelfUnionWatermarkPropagation() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        DataStream<Integer> dataStream1 = env.fromElements(1, 2, 3);
+
+        dataStream1
+                .union(dataStream1)
+                .transform(
+                        "Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new 
CustomOperator(false))
+                .addSink(new DiscardingSink<>());
+        env.execute();
+
+        assertEquals(
+                Watermark.MAX_WATERMARK,
+                CustomOperator.finalWatermarks[0].get(
+                        CustomOperator.finalWatermarks[0].size() - 1));
+    }
+
+    @Test
     public void testWatermarkPropagationNoFinalWatermarkOnStop() throws 
Exception {
 
         // for this test to work, we need to be sure that no other jobs are 
being executed
diff --git a/flink-tests/test/savepoint-921b4e-06351b8d4134/_metadata 
b/flink-tests/test/savepoint-921b4e-06351b8d4134/_metadata
new file mode 100644
index 0000000..372a6c4
Binary files /dev/null and 
b/flink-tests/test/savepoint-921b4e-06351b8d4134/_metadata differ

Reply via email to