Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5230#discussion_r165398176
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
---
@@ -753,6 +763,182 @@ public void onTimer(
assertTrue(getOperatorForDataStream(processed) instanceof
ProcessOperator);
}
+ @Test
+ public void testConnectWithBroadcastTranslation() throws Exception {
+
+ final Map<Long, String> expected = new HashMap<>();
+ expected.put(0L, "test:0");
+ expected.put(1L, "test:1");
+ expected.put(2L, "test:2");
+ expected.put(3L, "test:3");
+ expected.put(4L, "test:4");
+ expected.put(5L, "test:5");
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
+ .assignTimestampsAndWatermarks(new
CustomWmEmitter<Long>() {
+
+ @Override
+ public long extractTimestamp(Long
element, long previousElementTimestamp) {
+ return element;
+ }
+ }).keyBy((KeySelector<Long, Long>) value ->
value);
+
+ final DataStream<String> srcTwo =
env.fromCollection(expected.values())
+ .assignTimestampsAndWatermarks(new
CustomWmEmitter<String>() {
+ @Override
+ public long extractTimestamp(String
element, long previousElementTimestamp) {
+ return
Long.parseLong(element.split(":")[1]);
+ }
+ });
+
+ final BroadcastStream<String> broadcast =
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+ // the timestamp should be high enough to trigger the timer
after all the elements arrive.
+ final DataStream<String> output =
srcOne.connect(broadcast).process(
+ new TestBroadcastProcessFunction(100000L,
expected));
+
+ output.addSink(new DiscardingSink<>());
+ env.execute();
+ }
+
+ private abstract static class CustomWmEmitter<T> implements
AssignerWithPunctuatedWatermarks<T> {
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(T lastElement, long
extractedTimestamp) {
+ return new Watermark(extractedTimestamp);
+ }
+ }
+
+ private static class TestBroadcastProcessFunction extends
KeyedBroadcastProcessFunction<Long, Long, String, String> {
+
+ private final Map<Long, String> expectedState;
+
+ private final long timerTimestamp;
+
+ static final MapStateDescriptor<Long, String> DESCRIPTOR = new
MapStateDescriptor<>(
+ "broadcast-state",
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+ );
+
+ TestBroadcastProcessFunction(
+ final long timerTS,
+ final Map<Long, String> expectedBroadcastState
+ ) {
+ expectedState = expectedBroadcastState;
+ timerTimestamp = timerTS;
+ }
+
+ @Override
+ public void processElement(Long value, KeyedReadOnlyContext
ctx, Collector<String> out) throws Exception {
+
ctx.timerService().registerEventTimeTimer(timerTimestamp);
+ }
+
+ @Override
+ public void processBroadcastElement(String value,
KeyedReadWriteContext ctx, Collector<String> out) throws Exception {
+ long key = Long.parseLong(value.split(":")[1]);
+ ctx.getBroadcastState(DESCRIPTOR).put(key, value);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out) throws Exception {
+ Map<Long, String> map = new HashMap<>();
+ for (Map.Entry<Long, String> entry :
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ Assert.assertEquals(expectedState, map);
+ }
+ }
+
+ /**
+ * Tests that with a {@link KeyedStream} we have to provide a {@link
KeyedBroadcastProcessFunction}.
+ */
+ @Test(expected = IllegalArgumentException.class)
--- End diff --
The verification of the exception should happen right at the call-site
where we expect the exception. As it is now, the exception can be thrown
anywhere in the function body and the test would still pass.
Other tests use
```
@Rule
public ExpectedException expectedException = ExpectedException.none();
```
for that.
---