This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 9c1bf6c [FLINK-25199][network] Make sure StreamEdges are unique
9c1bf6c is described below
commit 9c1bf6cdb3a2ea9d1b7f2b7d8e687a003ce9d919
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Jan 20 09:31:24 2022 +0100
[FLINK-25199][network] Make sure StreamEdges are unique
Previously, if there was a node that was self-unioned with itself,
it was creating a situation with two identical StreamEdges. Both
with the same partitioning, from the same source node to the same
target node.
This was causing issues when constructing output collectors and
picking the correct RecordWriters, as StreamTask was not able to
uniquely identify given StreamEdge and was assigning the same
RecordWriter to both of the edges. As a result all stream elements
were sent twice through the same RecordWriter. It was actually pretty
harmless apart of calculating the combined watermark downstream,
since all watermarks were always comming just from one single
edge/inputgate, and the unused edges were always stuck with
min watermark.
As a solution we are making sure that StreamEdges are unique
by introducing a uniqueId field, incremented for every pair
of StreamEdges connecting the same node.
---
.../flink/streaming/api/graph/StreamEdge.java | 34 ++++++-
.../flink/streaming/api/graph/StreamGraph.java | 103 +++++++++++++--------
.../flink/streaming/api/graph/StreamNode.java | 11 +++
.../test/streaming/runtime/TimestampITCase.java | 19 ++++
.../test/savepoint-921b4e-06351b8d4134/_metadata | Bin 0 -> 198 bytes
5 files changed, 123 insertions(+), 44 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index f76edd5..2ea5671 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -44,6 +44,15 @@ public class StreamEdge implements Serializable {
private final int sourceId;
private final int targetId;
+ /**
+ * Note that this field doesn't have to be unique among all {@link
StreamEdge}s. It's enough if
+ * this field ensures that all logical instances of {@link StreamEdge} are
unique, and {@link
+ * #hashCode()} are different and {@link #equals(Object)} returns false,
for every possible pair
+ * of {@link StreamEdge}. Especially among two different {@link
StreamEdge}s that are connecting
+ * the same pair of nodes.
+ */
+ private final int uniqueId;
+
/** The type number of the input for co-tasks. */
private final int typeNumber;
/** The side-output tag (if any) of this {@link StreamEdge}. */
@@ -78,7 +87,8 @@ public class StreamEdge implements Serializable {
ALWAYS_FLUSH_BUFFER_TIMEOUT,
outputPartitioner,
outputTag,
- ShuffleMode.UNDEFINED);
+ ShuffleMode.UNDEFINED,
+ 0);
}
public StreamEdge(
@@ -87,7 +97,8 @@ public class StreamEdge implements Serializable {
int typeNumber,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag,
- ShuffleMode shuffleMode) {
+ ShuffleMode shuffleMode,
+ int uniqueId) {
this(
sourceVertex,
@@ -96,7 +107,8 @@ public class StreamEdge implements Serializable {
sourceVertex.getBufferTimeout(),
outputPartitioner,
outputTag,
- shuffleMode);
+ shuffleMode,
+ uniqueId);
}
public StreamEdge(
@@ -106,10 +118,12 @@ public class StreamEdge implements Serializable {
long bufferTimeout,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag,
- ShuffleMode shuffleMode) {
+ ShuffleMode shuffleMode,
+ int uniqueId) {
this.sourceId = sourceVertex.getId();
this.targetId = targetVertex.getId();
+ this.uniqueId = uniqueId;
this.typeNumber = typeNumber;
this.bufferTimeout = bufferTimeout;
this.outputPartitioner = outputPartitioner;
@@ -118,7 +132,15 @@ public class StreamEdge implements Serializable {
this.targetOperatorName = targetVertex.getOperatorName();
this.shuffleMode = checkNotNull(shuffleMode);
this.edgeId =
- sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" +
outputPartitioner;
+ sourceVertex
+ + "_"
+ + targetVertex
+ + "_"
+ + typeNumber
+ + "_"
+ + outputPartitioner
+ + "_"
+ + uniqueId;
}
public int getSourceId() {
@@ -198,6 +220,8 @@ public class StreamEdge implements Serializable {
+ bufferTimeout
+ ", outputTag="
+ outputTag
+ + ", uniqueId="
+ + uniqueId
+ ')';
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7b39b19e..d2ecac1 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -614,50 +614,75 @@ public class StreamGraph implements Pipeline {
outputTag,
shuffleMode);
} else {
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- // If no partitioner was specified and the parallelism of upstream
and downstream
- // operator matches use forward partitioning, use rebalance
otherwise.
- if (partitioner == null
- && upstreamNode.getParallelism() ==
downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
+ createActualEdge(
+ upStreamVertexID,
+ downStreamVertexID,
+ typeNumber,
+ partitioner,
+ outputTag,
+ shuffleMode);
+ }
+ }
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException(
- "Forward partitioning does not allow "
- + "change of parallelism. Upstream
operation: "
- + upstreamNode
- + " parallelism: "
- + upstreamNode.getParallelism()
- + ", downstream operation: "
- + downstreamNode
- + " parallelism: "
- + downstreamNode.getParallelism()
- + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
+ private void createActualEdge(
+ Integer upStreamVertexID,
+ Integer downStreamVertexID,
+ int typeNumber,
+ StreamPartitioner<?> partitioner,
+ OutputTag outputTag,
+ ShuffleMode shuffleMode) {
+ StreamNode upstreamNode = getStreamNode(upStreamVertexID);
+ StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+ // If no partitioner was specified and the parallelism of upstream and
downstream
+ // operator matches use forward partitioning, use rebalance otherwise.
+ if (partitioner == null
+ && upstreamNode.getParallelism() ==
downstreamNode.getParallelism()) {
+ partitioner = new ForwardPartitioner<Object>();
+ } else if (partitioner == null) {
+ partitioner = new RebalancePartitioner<Object>();
+ }
- if (shuffleMode == null) {
- shuffleMode = ShuffleMode.UNDEFINED;
+ if (partitioner instanceof ForwardPartitioner) {
+ if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
+ throw new UnsupportedOperationException(
+ "Forward partitioning does not allow "
+ + "change of parallelism. Upstream operation: "
+ + upstreamNode
+ + " parallelism: "
+ + upstreamNode.getParallelism()
+ + ", downstream operation: "
+ + downstreamNode
+ + " parallelism: "
+ + downstreamNode.getParallelism()
+ + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
}
+ }
- StreamEdge edge =
- new StreamEdge(
- upstreamNode,
- downstreamNode,
- typeNumber,
- partitioner,
- outputTag,
- shuffleMode);
-
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
+ if (shuffleMode == null) {
+ shuffleMode = ShuffleMode.UNDEFINED;
}
+
+ /**
+ * Just make sure that {@link StreamEdge} connecting same nodes (for
example as a result of
+ * self unioning a {@link DataStream}) are distinct and unique.
Otherwise it would be
+ * difficult on the {@link StreamTask} to assign {@link RecordWriter}s
to correct {@link
+ * StreamEdge}.
+ */
+ int uniqueId = getStreamEdges(upstreamNode.getId(),
downstreamNode.getId()).size();
+
+ StreamEdge edge =
+ new StreamEdge(
+ upstreamNode,
+ downstreamNode,
+ typeNumber,
+ partitioner,
+ outputTag,
+ shuffleMode,
+ uniqueId);
+
+ getStreamNode(edge.getSourceId()).addOutEdge(edge);
+ getStreamNode(edge.getTargetId()).addInEdge(edge);
}
public void setParallelism(Integer vertexID, int parallelism) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 151a4b7..06cbd63 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -45,6 +45,7 @@ import java.util.Optional;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
/** Class representing the operators in the streaming programs, with all their
properties. */
@Internal
@@ -120,6 +121,11 @@ public class StreamNode {
}
public void addInEdge(StreamEdge inEdge) {
+ checkState(
+ outEdges.stream().noneMatch(inEdge::equals),
+ "Adding not unique edge = %s to existing outEdges = %s",
+ inEdge,
+ inEdges);
if (inEdge.getTargetId() != getId()) {
throw new IllegalArgumentException("Destination id doesn't match
the StreamNode id");
} else {
@@ -128,6 +134,11 @@ public class StreamNode {
}
public void addOutEdge(StreamEdge outEdge) {
+ checkState(
+ outEdges.stream().noneMatch(outEdge::equals),
+ "Adding not unique edge = %s to existing outEdges = %s",
+ outEdge,
+ outEdges);
if (outEdge.getSourceId() != getId()) {
throw new IllegalArgumentException("Source id doesn't match the
StreamNode id");
} else {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 1dc9413..d49b129 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -160,6 +160,25 @@ public class TimestampITCase extends TestLogger {
}
@Test
+ public void testSelfUnionWatermarkPropagation() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ DataStream<Integer> dataStream1 = env.fromElements(1, 2, 3);
+
+ dataStream1
+ .union(dataStream1)
+ .transform(
+ "Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new
CustomOperator(false))
+ .addSink(new DiscardingSink<>());
+ env.execute();
+
+ assertEquals(
+ Watermark.MAX_WATERMARK,
+ CustomOperator.finalWatermarks[0].get(
+ CustomOperator.finalWatermarks[0].size() - 1));
+ }
+
+ @Test
public void testWatermarkPropagationNoFinalWatermarkOnStop() throws
Exception {
// for this test to work, we need to be sure that no other jobs are
being executed
diff --git a/flink-tests/test/savepoint-921b4e-06351b8d4134/_metadata
b/flink-tests/test/savepoint-921b4e-06351b8d4134/_metadata
new file mode 100644
index 0000000..372a6c4
Binary files /dev/null and
b/flink-tests/test/savepoint-921b4e-06351b8d4134/_metadata differ