Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r165398176
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
    @@ -753,6 +763,182 @@ public void onTimer(
                assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
        }
     
    +   @Test
    +   public void testConnectWithBroadcastTranslation() throws Exception {
    +
    +           final Map<Long, String> expected = new HashMap<>();
    +           expected.put(0L, "test:0");
    +           expected.put(1L, "test:1");
    +           expected.put(2L, "test:2");
    +           expected.put(3L, "test:3");
    +           expected.put(4L, "test:4");
    +           expected.put(5L, "test:5");
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +           final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
    +                           .assignTimestampsAndWatermarks(new 
CustomWmEmitter<Long>() {
    +
    +                                   @Override
    +                                   public long extractTimestamp(Long 
element, long previousElementTimestamp) {
    +                                           return element;
    +                                   }
    +                           }).keyBy((KeySelector<Long, Long>) value -> 
value);
    +
    +           final DataStream<String> srcTwo = 
env.fromCollection(expected.values())
    +                           .assignTimestampsAndWatermarks(new 
CustomWmEmitter<String>() {
    +                                   @Override
    +                                   public long extractTimestamp(String 
element, long previousElementTimestamp) {
    +                                           return 
Long.parseLong(element.split(":")[1]);
    +                                   }
    +                           });
    +
    +           final BroadcastStream<String> broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
    +
    +           // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
    +           final DataStream<String> output = 
srcOne.connect(broadcast).process(
    +                           new TestBroadcastProcessFunction(100000L, 
expected));
    +
    +           output.addSink(new DiscardingSink<>());
    +           env.execute();
    +   }
    +
    +   private abstract static class CustomWmEmitter<T> implements 
AssignerWithPunctuatedWatermarks<T> {
    +
    +           @Nullable
    +           @Override
    +           public Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp) {
    +                   return new Watermark(extractedTimestamp);
    +           }
    +   }
    +
    +   private static class TestBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction<Long, Long, String, String> {
    +
    +           private final Map<Long, String> expectedState;
    +
    +           private final long timerTimestamp;
    +
    +           static final MapStateDescriptor<Long, String> DESCRIPTOR = new 
MapStateDescriptor<>(
    +                           "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
    +           );
    +
    +           TestBroadcastProcessFunction(
    +                           final long timerTS,
    +                           final Map<Long, String> expectedBroadcastState
    +           ) {
    +                   expectedState = expectedBroadcastState;
    +                   timerTimestamp = timerTS;
    +           }
    +
    +           @Override
    +           public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
    +                   
ctx.timerService().registerEventTimeTimer(timerTimestamp);
    +           }
    +
    +           @Override
    +           public void processBroadcastElement(String value, 
KeyedReadWriteContext ctx, Collector<String> out) throws Exception {
    +                   long key = Long.parseLong(value.split(":")[1]);
    +                   ctx.getBroadcastState(DESCRIPTOR).put(key, value);
    +           }
    +
    +           @Override
    +           public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {
    +                   Map<Long, String> map = new HashMap<>();
    +                   for (Map.Entry<Long, String> entry : 
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
    +                           map.put(entry.getKey(), entry.getValue());
    +                   }
    +                   Assert.assertEquals(expectedState, map);
    +           }
    +   }
    +
    +   /**
    +    * Tests that with a {@link KeyedStream} we have to provide a {@link 
KeyedBroadcastProcessFunction}.
    +    */
    +   @Test(expected = IllegalArgumentException.class)
    --- End diff --
    
    The verification of the exception should happen right at the call-site 
where we expect the exception. As it is now, the exception can be thrown 
anywhere in the function body and the test would still pass.
    
    Other tests use
    ```
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    ```
    for that.


---

Reply via email to