Repository: flink
Updated Branches:
  refs/heads/release-1.4 963f14e26 -> 9b59e6ff5


[FLINK-9216][Streaming] Fix comparator violation

This closes #5878.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b59e6ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b59e6ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b59e6ff

Branch: refs/heads/release-1.4
Commit: 9b59e6ff5adeeef63a696c2837a47b77f4741a57
Parents: 963f14e
Author: Xpray <[email protected]>
Authored: Mon Apr 23 15:37:39 2018 +0800
Committer: zentol <[email protected]>
Committed: Wed May 2 15:23:58 2018 +0200

----------------------------------------------------------------------
 .../streaming/api/graph/JSONGenerator.java      | 12 ++++++----
 .../api/graph/StreamGraphGeneratorTest.java     | 24 ++++++++++++++++++++
 2 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b59e6ff/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index a9bb0b6..34a17b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -60,14 +60,16 @@ public class JSONGenerator {
                List<Integer> operatorIDs = new 
ArrayList<Integer>(streamGraph.getVertexIDs());
                Collections.sort(operatorIDs, new Comparator<Integer>() {
                        @Override
-                       public int compare(Integer o1, Integer o2) {
+                       public int compare(Integer idOne, Integer idTwo) {
+                               boolean isIdOneSinkId = 
streamGraph.getSinkIDs().contains(idOne);
+                               boolean isIdTwoSinkId = 
streamGraph.getSinkIDs().contains(idTwo);
                                // put sinks at the back
-                               if (streamGraph.getSinkIDs().contains(o1)) {
+                               if (isIdOneSinkId == isIdTwoSinkId) {
+                                       return idOne.compareTo(idTwo);
+                               } else if (isIdOneSinkId) {
                                        return 1;
-                               } else if 
(streamGraph.getSinkIDs().contains(o2)) {
-                                       return -1;
                                } else {
-                                       return o1 - o2;
+                                       return -1;
                                }
                        }
                });

http://git-wip-us.apache.org/repos/asf/flink/blob/9b59e6ff/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 8149d24..d10fb3c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -413,6 +414,29 @@ public class StreamGraphGeneratorTest {
                StreamPartitioner<?> streamPartitioner2 = 
keyedResultNode.getInEdges().get(1).getPartitioner();
        }
 
+       /**
+        * Tests that the json generated by JSONGenerator shall meet with 2 
requirements:
+        * 1. sink nodes are at the back
+        * 2. if both two nodes are sink nodes or neither of them is sink node, 
then sort by its id.
+        */
+       @Test
+       public void testSinkIdComparison() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStream<Integer> source = env.fromElements(1, 2, 3);
+               for (int i = 0; i < 32; i++) {
+                       if (i % 2 == 0) {
+                               source.addSink(new SinkFunction<Integer>() {
+                                       @Override
+                                       public void invoke(Integer value) 
throws Exception {}
+                               });
+                       } else {
+                               source.map(x -> x + 1);
+                       }
+               }
+               // IllegalArgumentException will be thrown without FLINK-9216
+               env.getStreamGraph().getStreamingPlanAsJSON();
+       }
+
        private static class OutputTypeConfigurableOperationWithTwoInputs
                        extends AbstractStreamOperator<Integer>
                        implements TwoInputStreamOperator<Integer, Integer, 
Integer>, OutputTypeConfigurable<Integer> {

Reply via email to