[hotfix] Create BroadcastITCase.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5050f911 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5050f911 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5050f911 Branch: refs/heads/master Commit: 5050f9119c4c83765019d1da51d4c1bff6f2f44b Parents: 2876823 Author: kkloudas <[email protected]> Authored: Fri Feb 2 14:59:34 2018 +0100 Committer: kkloudas <[email protected]> Committed: Wed Feb 7 14:09:16 2018 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/DataStreamTest.java | 124 +++---------- .../streaming/runtime/BroadcastStateITCase.java | 183 +++++++++++++++++++ 2 files changed, 207 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5050f911/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index ca76ef4..ec8a134 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -38,6 +38,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; @@ -87,9 +88,7 @@ import org.junit.rules.ExpectedException; import javax.annotation.Nullable; import java.lang.reflect.Method; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -103,6 +102,9 @@ import static org.junit.Assert.fail; @SuppressWarnings("serial") public class DataStreamTest extends TestLogger { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + /** * Tests union functionality. This ensures that self-unions and unions of streams * with differing parallelism work. @@ -763,99 +765,10 @@ public class DataStreamTest extends TestLogger { assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } - @Test - public void testConnectWithBroadcastTranslation() throws Exception { - - final Map<Long, String> expected = new HashMap<>(); - expected.put(0L, "test:0"); - expected.put(1L, "test:1"); - expected.put(2L, "test:2"); - expected.put(3L, "test:3"); - expected.put(4L, "test:4"); - expected.put(5L, "test:5"); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - final DataStream<Long> srcOne = env.generateSequence(0L, 5L) - .assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { - - @Override - public long extractTimestamp(Long element, long previousElementTimestamp) { - return element; - } - }).keyBy((KeySelector<Long, Long>) value -> value); - - final DataStream<String> srcTwo = env.fromCollection(expected.values()) - .assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { - @Override - public long extractTimestamp(String element, long previousElementTimestamp) { - return Long.parseLong(element.split(":")[1]); - } - }); - - final BroadcastStream<String> broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); - - // the timestamp should be high enough to trigger the timer after all the elements arrive. - final DataStream<String> output = srcOne.connect(broadcast).process( - new TestBroadcastProcessFunction(100000L, expected)); - - output.addSink(new DiscardingSink<>()); - env.execute(); - } - - private abstract static class CustomWmEmitter<T> implements AssignerWithPunctuatedWatermarks<T> { - - @Nullable - @Override - public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) { - return new Watermark(extractedTimestamp); - } - } - - private static class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, Long, String, String> { - - private final Map<Long, String> expectedState; - - private final long timerTimestamp; - - static final MapStateDescriptor<Long, String> DESCRIPTOR = new MapStateDescriptor<>( - "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO - ); - - TestBroadcastProcessFunction( - final long timerTS, - final Map<Long, String> expectedBroadcastState - ) { - expectedState = expectedBroadcastState; - timerTimestamp = timerTS; - } - - @Override - public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { - ctx.timerService().registerEventTimeTimer(timerTimestamp); - } - - @Override - public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception { - long key = Long.parseLong(value.split(":")[1]); - ctx.getBroadcastState(DESCRIPTOR).put(key, value); - } - - @Override - public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { - Map<Long, String> map = new HashMap<>(); - for (Map.Entry<Long, String> entry : ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) { - map.put(entry.getKey(), entry.getValue()); - } - Assert.assertEquals(expectedState, map); - } - } - /** * Tests that with a {@link KeyedStream} we have to provide a {@link KeyedBroadcastProcessFunction}. */ - @Test(expected = IllegalArgumentException.class) + @Test public void testFailedTranslationOnKeyed() { final MapStateDescriptor<Long, String> descriptor = new MapStateDescriptor<>( @@ -881,8 +794,11 @@ public class DataStreamTest extends TestLogger { }); BroadcastStream<String> broadcast = srcTwo.broadcast(descriptor); - srcOne.connect(broadcast) - .process(new BroadcastProcessFunction<Long, String, String>() { + BroadcastConnectedStream<Long, String> bcStream = srcOne.connect(broadcast); + + expectedException.expect(IllegalArgumentException.class); + bcStream.process( + new BroadcastProcessFunction<Long, String, String>() { @Override public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { // do nothing @@ -898,7 +814,7 @@ public class DataStreamTest extends TestLogger { /** * Tests that with a non-keyed stream we have to provide a {@link BroadcastProcessFunction}. */ - @Test(expected = IllegalArgumentException.class) + @Test public void testFailedTranslationOnNonKeyed() { final MapStateDescriptor<Long, String> descriptor = new MapStateDescriptor<>( @@ -924,9 +840,11 @@ public class DataStreamTest extends TestLogger { }); BroadcastStream<String> broadcast = srcTwo.broadcast(descriptor); - srcOne.connect(broadcast) - .process(new KeyedBroadcastProcessFunction<String, Long, String, String>() { + BroadcastConnectedStream<Long, String> bcStream = srcOne.connect(broadcast); + expectedException.expect(IllegalArgumentException.class); + bcStream.process( + new KeyedBroadcastProcessFunction<String, Long, String, String>() { @Override public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception { // do nothing @@ -939,6 +857,15 @@ public class DataStreamTest extends TestLogger { }); } + private abstract static class CustomWmEmitter<T> implements AssignerWithPunctuatedWatermarks<T> { + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp); + } + } + @Test public void operatorTest() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -1131,9 +1058,6 @@ public class DataStreamTest extends TestLogger { // KeyBy testing ///////////////////////////////////////////////////////////// - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Test public void testPrimitiveArrayKeyRejection() { http://git-wip-us.apache.org/repos/asf/flink/blob/5050f911/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java new file mode 100644 index 0000000..4b0b9c5 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; + +/** + * ITCase for the {@link org.apache.flink.api.common.state.BroadcastState}. + */ +public class BroadcastStateITCase { + + @Test + public void testConnectWithBroadcastTranslation() throws Exception { + + final Map<Long, String> expected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream<Long> srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { + + private static final long serialVersionUID = -8500904795760316195L; + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector<Long, Long>) value -> value); + + final DataStream<String> srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { + + private static final long serialVersionUID = -2148318224248467213L; + + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream<String> broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream<String> output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction(100000L, expected)); + + output + .addSink(new TestSink(expected.size())) + .setParallelism(1); + env.execute(); + } + + private static class TestSink extends RichSinkFunction<String> { + + private static final long serialVersionUID = 7252508825104554749L; + + private final int expectedOutputCounter; + + private int outputCounter; + + TestSink(int expectedOutputCounter) { + this.expectedOutputCounter = expectedOutputCounter; + this.outputCounter = 0; + } + + @Override + public void invoke(String value, Context context) throws Exception { + outputCounter++; + } + + @Override + public void close() throws Exception { + super.close(); + + // make sure that all the timers fired + Assert.assertEquals(expectedOutputCounter, outputCounter); + } + } + + private abstract static class CustomWmEmitter<T> implements AssignerWithPunctuatedWatermarks<T> { + + private static final long serialVersionUID = -5187335197674841233L; + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp); + } + } + + /** + * A {@link KeyedBroadcastProcessFunction} which on the broadcast side puts elements in the broadcast state + * while on the non-broadcast side, it sets a timer to fire at some point in the future. Finally, when the onTimer + * method is called (i.e. when the timer fires), we verify that the result is the expected one. + */ + private static class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, Long, String, String> { + + private static final long serialVersionUID = 7616910653561100842L; + + private final Map<Long, String> expectedState; + + private final long timerTimestamp; + + static final MapStateDescriptor<Long, String> DESCRIPTOR = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + + TestBroadcastProcessFunction( + final long timerTS, + final Map<Long, String> expectedBroadcastState + ) { + expectedState = expectedBroadcastState; + timerTimestamp = timerTS; + } + + @Override + public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + ctx.timerService().registerEventTimeTimer(timerTimestamp); + } + + @Override + public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception { + long key = Long.parseLong(value.split(":")[1]); + ctx.getBroadcastState(DESCRIPTOR).put(key, value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { + Assert.assertEquals(timerTimestamp, timestamp); + + Map<Long, String> map = new HashMap<>(); + for (Map.Entry<Long, String> entry : ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) { + map.put(entry.getKey(), entry.getValue()); + } + + Assert.assertEquals(expectedState, map); + + out.collect(Long.toString(timestamp)); + } + } +}
