Repository: flink
Updated Branches:
  refs/heads/master 612e2d834 -> f3d4011f8


[FLINK-9152] Add simple ITCase for non-keyed Broadcast Connect translation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3d4011f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3d4011f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3d4011f

Branch: refs/heads/master
Commit: f3d4011f80b3d6b25e55a354d1a41928074f02a0
Parents: 48d05f1
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Apr 11 17:47:03 2018 -0700
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Apr 12 08:04:30 2018 -0700

----------------------------------------------------------------------
 .../streaming/runtime/BroadcastStateITCase.java | 93 +++++++++++++++++++-
 1 file changed, 89 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3d4011f/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 8f442c0..9400614 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -47,7 +48,7 @@ import static org.junit.Assert.assertEquals;
 public class BroadcastStateITCase {
 
        @Test
-       public void testConnectWithBroadcastTranslation() throws Exception {
+       public void testKeyedWithBroadcastTranslation() throws Exception {
 
                final MapStateDescriptor<Long, String> utterDescriptor = new 
MapStateDescriptor<>(
                                "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
@@ -90,7 +91,7 @@ public class BroadcastStateITCase {
 
                // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
                final DataStream<String> output = 
srcOne.connect(broadcast).process(
-                               new TestBroadcastProcessFunction(100000L, 
expected));
+                               new TestKeyedBroadcastProcessFunction(100000L, 
expected));
 
                output
                                .addSink(new TestSink(expected.size()))
@@ -98,6 +99,58 @@ public class BroadcastStateITCase {
                env.execute();
        }
 
+       @Test
+       public void testBroadcastTranslation() throws Exception {
+
+               final MapStateDescriptor<Long, String> utterDescriptor = new 
MapStateDescriptor<>(
+                       "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO
+               );
+
+               final Map<Long, String> expected = new HashMap<>();
+               expected.put(0L, "test:0");
+               expected.put(1L, "test:1");
+               expected.put(2L, "test:2");
+               expected.put(3L, "test:3");
+               expected.put(4L, "test:4");
+               expected.put(5L, "test:5");
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
+                       .assignTimestampsAndWatermarks(new 
CustomWmEmitter<Long>() {
+
+                               private static final long serialVersionUID = 
-8500904795760316195L;
+
+                               @Override
+                               public long extractTimestamp(Long element, long 
previousElementTimestamp) {
+                                       return element;
+                               }
+                       });
+
+               final DataStream<String> srcTwo = 
env.fromCollection(expected.values())
+                       .assignTimestampsAndWatermarks(new 
CustomWmEmitter<String>() {
+
+                               private static final long serialVersionUID = 
-2148318224248467213L;
+
+                               @Override
+                               public long extractTimestamp(String element, 
long previousElementTimestamp) {
+                                       return 
Long.parseLong(element.split(":")[1]);
+                               }
+                       });
+
+               final BroadcastStream<String> broadcast = 
srcTwo.broadcast(utterDescriptor);
+
+               // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+               final DataStream<String> output = 
srcOne.connect(broadcast).process(
+                       new TestBroadcastProcessFunction());
+
+               output
+                       .addSink(new TestSink(0))
+                       .setParallelism(1);
+               env.execute();
+       }
+
        private static class TestSink extends RichSinkFunction<String> {
 
                private static final long serialVersionUID = 
7252508825104554749L;
@@ -141,7 +194,7 @@ public class BroadcastStateITCase {
         * while on the non-broadcast side, it sets a timer to fire at some 
point in the future. Finally, when the onTimer
         * method is called (i.e. when the timer fires), we verify that the 
result is the expected one.
         */
-       private static class TestBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction<Long, Long, String, String> {
+       private static class TestKeyedBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction<Long, Long, String, String> {
 
                private static final long serialVersionUID = 
7616910653561100842L;
 
@@ -152,7 +205,7 @@ public class BroadcastStateITCase {
 
                private transient MapStateDescriptor<Long, String> descriptor;
 
-               TestBroadcastProcessFunction(final long initialTimerTimestamp, 
final Map<Long, String> expectedBroadcastState) {
+               TestKeyedBroadcastProcessFunction(final long 
initialTimerTimestamp, final Map<Long, String> expectedBroadcastState) {
                        expectedState = expectedBroadcastState;
                        nextTimerTimestamp = initialTimerTimestamp;
                }
@@ -194,4 +247,36 @@ public class BroadcastStateITCase {
                        out.collect(Long.toString(timestamp));
                }
        }
+
+       /**
+        * This doesn't do much but we use it to verify that translation of 
non-keyed broadcast connect
+        * works.
+        */
+       private static class TestBroadcastProcessFunction extends
+                       BroadcastProcessFunction<Long, String, String> {
+
+               private static final long serialVersionUID = 
7616910653561100842L;
+
+               private transient MapStateDescriptor<Long, String> descriptor;
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+
+                       descriptor = new MapStateDescriptor<>(
+                               "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+                       );
+               }
+
+               @Override
+               public void processElement(Long value, ReadOnlyContext ctx, 
Collector<String> out) throws Exception {
+               }
+
+               @Override
+               public void processBroadcastElement(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       long key = Long.parseLong(value.split(":")[1]);
+                       ctx.getBroadcastState(descriptor).put(key, value);
+               }
+       }
+
 }

Reply via email to