[hotfix] Create BroadcastITCase.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5050f911
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5050f911
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5050f911

Branch: refs/heads/master
Commit: 5050f9119c4c83765019d1da51d4c1bff6f2f44b
Parents: 2876823
Author: kkloudas <[email protected]>
Authored: Fri Feb 2 14:59:34 2018 +0100
Committer: kkloudas <[email protected]>
Committed: Wed Feb 7 14:09:16 2018 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/DataStreamTest.java     | 124 +++----------
 .../streaming/runtime/BroadcastStateITCase.java | 183 +++++++++++++++++++
 2 files changed, 207 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5050f911/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index ca76ef4..ec8a134 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
 import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -87,9 +88,7 @@ import org.junit.rules.ExpectedException;
 import javax.annotation.Nullable;
 
 import java.lang.reflect.Method;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -103,6 +102,9 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public class DataStreamTest extends TestLogger {
 
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
+
        /**
         * Tests union functionality. This ensures that self-unions and unions 
of streams
         * with differing parallelism work.
@@ -763,99 +765,10 @@ public class DataStreamTest extends TestLogger {
                assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
        }
 
-       @Test
-       public void testConnectWithBroadcastTranslation() throws Exception {
-
-               final Map<Long, String> expected = new HashMap<>();
-               expected.put(0L, "test:0");
-               expected.put(1L, "test:1");
-               expected.put(2L, "test:2");
-               expected.put(3L, "test:3");
-               expected.put(4L, "test:4");
-               expected.put(5L, "test:5");
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-               final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
-                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<Long>() {
-
-                                       @Override
-                                       public long extractTimestamp(Long 
element, long previousElementTimestamp) {
-                                               return element;
-                                       }
-                               }).keyBy((KeySelector<Long, Long>) value -> 
value);
-
-               final DataStream<String> srcTwo = 
env.fromCollection(expected.values())
-                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<String>() {
-                                       @Override
-                                       public long extractTimestamp(String 
element, long previousElementTimestamp) {
-                                               return 
Long.parseLong(element.split(":")[1]);
-                                       }
-                               });
-
-               final BroadcastStream<String> broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
-
-               // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
-               final DataStream<String> output = 
srcOne.connect(broadcast).process(
-                               new TestBroadcastProcessFunction(100000L, 
expected));
-
-               output.addSink(new DiscardingSink<>());
-               env.execute();
-       }
-
-       private abstract static class CustomWmEmitter<T> implements 
AssignerWithPunctuatedWatermarks<T> {
-
-               @Nullable
-               @Override
-               public Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp) {
-                       return new Watermark(extractedTimestamp);
-               }
-       }
-
-       private static class TestBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction<Long, Long, String, String> {
-
-               private final Map<Long, String> expectedState;
-
-               private final long timerTimestamp;
-
-               static final MapStateDescriptor<Long, String> DESCRIPTOR = new 
MapStateDescriptor<>(
-                               "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
-               );
-
-               TestBroadcastProcessFunction(
-                               final long timerTS,
-                               final Map<Long, String> expectedBroadcastState
-               ) {
-                       expectedState = expectedBroadcastState;
-                       timerTimestamp = timerTS;
-               }
-
-               @Override
-               public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
-                       
ctx.timerService().registerEventTimeTimer(timerTimestamp);
-               }
-
-               @Override
-               public void processBroadcastElement(String value, KeyedContext 
ctx, Collector<String> out) throws Exception {
-                       long key = Long.parseLong(value.split(":")[1]);
-                       ctx.getBroadcastState(DESCRIPTOR).put(key, value);
-               }
-
-               @Override
-               public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {
-                       Map<Long, String> map = new HashMap<>();
-                       for (Map.Entry<Long, String> entry : 
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
-                               map.put(entry.getKey(), entry.getValue());
-                       }
-                       Assert.assertEquals(expectedState, map);
-               }
-       }
-
        /**
         * Tests that with a {@link KeyedStream} we have to provide a {@link 
KeyedBroadcastProcessFunction}.
         */
-       @Test(expected = IllegalArgumentException.class)
+       @Test
        public void testFailedTranslationOnKeyed() {
 
                final MapStateDescriptor<Long, String> descriptor = new 
MapStateDescriptor<>(
@@ -881,8 +794,11 @@ public class DataStreamTest extends TestLogger {
                                });
 
                BroadcastStream<String> broadcast = 
srcTwo.broadcast(descriptor);
-               srcOne.connect(broadcast)
-                               .process(new BroadcastProcessFunction<Long, 
String, String>() {
+               BroadcastConnectedStream<Long, String> bcStream = 
srcOne.connect(broadcast);
+
+               expectedException.expect(IllegalArgumentException.class);
+               bcStream.process(
+                               new BroadcastProcessFunction<Long, String, 
String>() {
                                        @Override
                                        public void 
processBroadcastElement(String value, Context ctx, Collector<String> out) 
throws Exception {
                                                // do nothing
@@ -898,7 +814,7 @@ public class DataStreamTest extends TestLogger {
        /**
         * Tests that with a non-keyed stream we have to provide a {@link 
BroadcastProcessFunction}.
         */
-       @Test(expected = IllegalArgumentException.class)
+       @Test
        public void testFailedTranslationOnNonKeyed() {
 
                final MapStateDescriptor<Long, String> descriptor = new 
MapStateDescriptor<>(
@@ -924,9 +840,11 @@ public class DataStreamTest extends TestLogger {
                                });
 
                BroadcastStream<String> broadcast = 
srcTwo.broadcast(descriptor);
-               srcOne.connect(broadcast)
-                               .process(new 
KeyedBroadcastProcessFunction<String, Long, String, String>() {
+               BroadcastConnectedStream<Long, String> bcStream = 
srcOne.connect(broadcast);
 
+               expectedException.expect(IllegalArgumentException.class);
+               bcStream.process(
+                               new KeyedBroadcastProcessFunction<String, Long, 
String, String>() {
                                        @Override
                                        public void 
processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) 
throws Exception {
                                                // do nothing
@@ -939,6 +857,15 @@ public class DataStreamTest extends TestLogger {
                                });
        }
 
+       private abstract static class CustomWmEmitter<T> implements 
AssignerWithPunctuatedWatermarks<T> {
+
+               @Nullable
+               @Override
+               public Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp) {
+                       return new Watermark(extractedTimestamp);
+               }
+       }
+
        @Test
        public void operatorTest() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1131,9 +1058,6 @@ public class DataStreamTest extends TestLogger {
        // KeyBy testing
        /////////////////////////////////////////////////////////////
 
-       @Rule
-       public ExpectedException expectedException = ExpectedException.none();
-
        @Test
        public void testPrimitiveArrayKeyRejection() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5050f911/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
new file mode 100644
index 0000000..4b0b9c5
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ITCase for the {@link org.apache.flink.api.common.state.BroadcastState}.
+ */
+public class BroadcastStateITCase {
+
+       @Test
+       public void testConnectWithBroadcastTranslation() throws Exception {
+
+               final Map<Long, String> expected = new HashMap<>();
+               expected.put(0L, "test:0");
+               expected.put(1L, "test:1");
+               expected.put(2L, "test:2");
+               expected.put(3L, "test:3");
+               expected.put(4L, "test:4");
+               expected.put(5L, "test:5");
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
+                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<Long>() {
+
+                                       private static final long 
serialVersionUID = -8500904795760316195L;
+
+                                       @Override
+                                       public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+                                               return element;
+                                       }
+                               }).keyBy((KeySelector<Long, Long>) value -> 
value);
+
+               final DataStream<String> srcTwo = 
env.fromCollection(expected.values())
+                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<String>() {
+
+                                       private static final long 
serialVersionUID = -2148318224248467213L;
+
+                                       @Override
+                                       public long extractTimestamp(String 
element, long previousElementTimestamp) {
+                                               return 
Long.parseLong(element.split(":")[1]);
+                                       }
+                               });
+
+               final BroadcastStream<String> broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+               // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+               final DataStream<String> output = 
srcOne.connect(broadcast).process(
+                               new TestBroadcastProcessFunction(100000L, 
expected));
+
+               output
+                               .addSink(new TestSink(expected.size()))
+                               .setParallelism(1);
+               env.execute();
+       }
+
+       private static class TestSink extends RichSinkFunction<String> {
+
+               private static final long serialVersionUID = 
7252508825104554749L;
+
+               private final int expectedOutputCounter;
+
+               private int outputCounter;
+
+               TestSink(int expectedOutputCounter) {
+                       this.expectedOutputCounter = expectedOutputCounter;
+                       this.outputCounter = 0;
+               }
+
+               @Override
+               public void invoke(String value, Context context) throws 
Exception {
+                       outputCounter++;
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+
+                       // make sure that all the timers fired
+                       Assert.assertEquals(expectedOutputCounter, 
outputCounter);
+               }
+       }
+
+       private abstract static class CustomWmEmitter<T> implements 
AssignerWithPunctuatedWatermarks<T> {
+
+               private static final long serialVersionUID = 
-5187335197674841233L;
+
+               @Nullable
+               @Override
+               public Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp) {
+                       return new Watermark(extractedTimestamp);
+               }
+       }
+
+       /**
+        * A {@link KeyedBroadcastProcessFunction} which on the broadcast side 
puts elements in the broadcast state
+        * while on the non-broadcast side, it sets a timer to fire at some 
point in the future. Finally, when the onTimer
+        * method is called (i.e. when the timer fires), we verify that the 
result is the expected one.
+        */
+       private static class TestBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction<Long, Long, String, String> {
+
+               private static final long serialVersionUID = 
7616910653561100842L;
+
+               private final Map<Long, String> expectedState;
+
+               private final long timerTimestamp;
+
+               static final MapStateDescriptor<Long, String> DESCRIPTOR = new 
MapStateDescriptor<>(
+                               "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+               );
+
+               TestBroadcastProcessFunction(
+                               final long timerTS,
+                               final Map<Long, String> expectedBroadcastState
+               ) {
+                       expectedState = expectedBroadcastState;
+                       timerTimestamp = timerTS;
+               }
+
+               @Override
+               public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
+                       
ctx.timerService().registerEventTimeTimer(timerTimestamp);
+               }
+
+               @Override
+               public void processBroadcastElement(String value, KeyedContext 
ctx, Collector<String> out) throws Exception {
+                       long key = Long.parseLong(value.split(":")[1]);
+                       ctx.getBroadcastState(DESCRIPTOR).put(key, value);
+               }
+
+               @Override
+               public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {
+                       Assert.assertEquals(timerTimestamp, timestamp);
+
+                       Map<Long, String> map = new HashMap<>();
+                       for (Map.Entry<Long, String> entry : 
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
+                               map.put(entry.getKey(), entry.getValue());
+                       }
+
+                       Assert.assertEquals(expectedState, map);
+
+                       out.collect(Long.toString(timestamp));
+               }
+       }
+}

Reply via email to