Repository: flink Updated Branches: refs/heads/release-1.4 963f14e26 -> 9b59e6ff5
[FLINK-9216][Streaming] Fix comparator violation This closes #5878. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b59e6ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b59e6ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b59e6ff Branch: refs/heads/release-1.4 Commit: 9b59e6ff5adeeef63a696c2837a47b77f4741a57 Parents: 963f14e Author: Xpray <[email protected]> Authored: Mon Apr 23 15:37:39 2018 +0800 Committer: zentol <[email protected]> Committed: Wed May 2 15:23:58 2018 +0200 ---------------------------------------------------------------------- .../streaming/api/graph/JSONGenerator.java | 12 ++++++---- .../api/graph/StreamGraphGeneratorTest.java | 24 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9b59e6ff/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java index a9bb0b6..34a17b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java @@ -60,14 +60,16 @@ public class JSONGenerator { List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs()); Collections.sort(operatorIDs, new Comparator<Integer>() { @Override - public int compare(Integer o1, Integer o2) { + public int compare(Integer idOne, Integer idTwo) { + boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne); + boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo); // put sinks at the back - if (streamGraph.getSinkIDs().contains(o1)) { + if (isIdOneSinkId == isIdTwoSinkId) { + return idOne.compareTo(idTwo); + } else if (isIdOneSinkId) { return 1; - } else if (streamGraph.getSinkIDs().contains(o2)) { - return -1; } else { - return o1 - o2; + return -1; } } }); http://git-wip-us.apache.org/repos/asf/flink/blob/9b59e6ff/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 8149d24..d10fb3c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -413,6 +414,29 @@ public class StreamGraphGeneratorTest { StreamPartitioner<?> streamPartitioner2 = keyedResultNode.getInEdges().get(1).getPartitioner(); } + /** + * Tests that the json generated by JSONGenerator shall meet with 2 requirements: + * 1. sink nodes are at the back + * 2. if both two nodes are sink nodes or neither of them is sink node, then sort by its id. + */ + @Test + public void testSinkIdComparison() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Integer> source = env.fromElements(1, 2, 3); + for (int i = 0; i < 32; i++) { + if (i % 2 == 0) { + source.addSink(new SinkFunction<Integer>() { + @Override + public void invoke(Integer value) throws Exception {} + }); + } else { + source.map(x -> x + 1); + } + } + // IllegalArgumentException will be thrown without FLINK-9216 + env.getStreamGraph().getStreamingPlanAsJSON(); + } + private static class OutputTypeConfigurableOperationWithTwoInputs extends AbstractStreamOperator<Integer> implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
