[FLINK-7635] Add side-output test in WindowOperatorContractTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ebd44a6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ebd44a6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ebd44a6 Branch: refs/heads/master Commit: 1ebd44a634fe5053c89acf7092571a6b169f11b9 Parents: c151a53 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Sep 25 15:49:44 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Sep 25 15:49:44 2017 +0200 ---------------------------------------------------------------------- .../windowing/WindowOperatorContractTest.java | 54 ++++++++++++++++++++ 1 file changed, 54 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1ebd44a6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index 8ceda45..bd263f6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -366,6 +366,60 @@ public abstract class WindowOperatorContractTest extends TestLogger { } + /** + * This also verifies that the timestamps ouf side-emitted records is correct. + */ + @Test + public void testSideOutput() throws Exception { + + final OutputTag<Integer> integerOutputTag = new OutputTag<Integer>("int-out") {}; + final OutputTag<Long> longOutputTag = new OutputTag<Long>("long-out") {}; + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> windowFunction = + new InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow>() { + @Override + public void process( + Integer integer, + TimeWindow window, + InternalWindowContext ctx, + Iterable<Integer> input, + Collector<Void> out) throws Exception { + Integer inputValue = input.iterator().next(); + + ctx.output(integerOutputTag, inputValue); + ctx.output(longOutputTag, inputValue.longValue()); + } + + @Override + public void clear( + TimeWindow window, + InternalWindowContext context) throws Exception {} + }; + + OneInputStreamOperatorTestHarness<Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, windowFunction); + + testHarness.open(); + + final long windowEnd = 42L; + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Collections.singletonList(new TimeWindow(0, windowEnd))); + + shouldFireOnElement(mockTrigger); + + testHarness.processElement(new StreamRecord<>(17, 5L)); + + assertThat(testHarness.getSideOutput(integerOutputTag), + contains(isStreamRecord(17, windowEnd - 1))); + + assertThat(testHarness.getSideOutput(longOutputTag), + contains(isStreamRecord(17L, windowEnd - 1))); + } + @Test public void testAssignerIsInvokedOncePerElement() throws Exception {