Repository: flink Updated Branches: refs/heads/master 612e2d834 -> f3d4011f8
[FLINK-9152] Add simple ITCase for non-keyed Broadcast Connect translation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3d4011f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3d4011f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3d4011f Branch: refs/heads/master Commit: f3d4011f80b3d6b25e55a354d1a41928074f02a0 Parents: 48d05f1 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Apr 11 17:47:03 2018 -0700 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Apr 12 08:04:30 2018 -0700 ---------------------------------------------------------------------- .../streaming/runtime/BroadcastStateITCase.java | 93 +++++++++++++++++++- 1 file changed, 89 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f3d4011f/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java index 8f442c0..9400614 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.watermark.Watermark; @@ -47,7 +48,7 @@ import static org.junit.Assert.assertEquals; public class BroadcastStateITCase { @Test - public void testConnectWithBroadcastTranslation() throws Exception { + public void testKeyedWithBroadcastTranslation() throws Exception { final MapStateDescriptor<Long, String> utterDescriptor = new MapStateDescriptor<>( "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO @@ -90,7 +91,7 @@ public class BroadcastStateITCase { // the timestamp should be high enough to trigger the timer after all the elements arrive. final DataStream<String> output = srcOne.connect(broadcast).process( - new TestBroadcastProcessFunction(100000L, expected)); + new TestKeyedBroadcastProcessFunction(100000L, expected)); output .addSink(new TestSink(expected.size())) @@ -98,6 +99,58 @@ public class BroadcastStateITCase { env.execute(); } + @Test + public void testBroadcastTranslation() throws Exception { + + final MapStateDescriptor<Long, String> utterDescriptor = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + + final Map<Long, String> expected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream<Long> srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { + + private static final long serialVersionUID = -8500904795760316195L; + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }); + + final DataStream<String> srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { + + private static final long serialVersionUID = -2148318224248467213L; + + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream<String> broadcast = srcTwo.broadcast(utterDescriptor); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream<String> output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction()); + + output + .addSink(new TestSink(0)) + .setParallelism(1); + env.execute(); + } + private static class TestSink extends RichSinkFunction<String> { private static final long serialVersionUID = 7252508825104554749L; @@ -141,7 +194,7 @@ public class BroadcastStateITCase { * while on the non-broadcast side, it sets a timer to fire at some point in the future. Finally, when the onTimer * method is called (i.e. when the timer fires), we verify that the result is the expected one. */ - private static class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, Long, String, String> { + private static class TestKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, Long, String, String> { private static final long serialVersionUID = 7616910653561100842L; @@ -152,7 +205,7 @@ public class BroadcastStateITCase { private transient MapStateDescriptor<Long, String> descriptor; - TestBroadcastProcessFunction(final long initialTimerTimestamp, final Map<Long, String> expectedBroadcastState) { + TestKeyedBroadcastProcessFunction(final long initialTimerTimestamp, final Map<Long, String> expectedBroadcastState) { expectedState = expectedBroadcastState; nextTimerTimestamp = initialTimerTimestamp; } @@ -194,4 +247,36 @@ public class BroadcastStateITCase { out.collect(Long.toString(timestamp)); } } + + /** + * This doesn't do much but we use it to verify that translation of non-keyed broadcast connect + * works. + */ + private static class TestBroadcastProcessFunction extends + BroadcastProcessFunction<Long, String, String> { + + private static final long serialVersionUID = 7616910653561100842L; + + private transient MapStateDescriptor<Long, String> descriptor; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + descriptor = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + } + + @Override + public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception { + } + + @Override + public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { + long key = Long.parseLong(value.split(":")[1]); + ctx.getBroadcastState(descriptor).put(key, value); + } + } + }