[FLINK-7635] Add side-output test in WindowOperatorContractTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ebd44a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ebd44a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ebd44a6

Branch: refs/heads/master
Commit: 1ebd44a634fe5053c89acf7092571a6b169f11b9
Parents: c151a53
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Sep 25 15:49:44 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Sep 25 15:49:44 2017 +0200

----------------------------------------------------------------------
 .../windowing/WindowOperatorContractTest.java   | 54 ++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ebd44a6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 8ceda45..bd263f6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -366,6 +366,60 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
        }
 
+       /**
+        * This also verifies that the timestamps ouf side-emitted records is 
correct.
+        */
+       @Test
+       public void testSideOutput() throws Exception {
+
+               final OutputTag<Integer> integerOutputTag = new 
OutputTag<Integer>("int-out") {};
+               final OutputTag<Long> longOutputTag = new 
OutputTag<Long>("long-out") {};
+
+               WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
+               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+
+               InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> windowFunction =
+                       new InternalWindowFunction<Iterable<Integer>, Void, 
Integer, TimeWindow>() {
+                               @Override
+                               public void process(
+                                               Integer integer,
+                                               TimeWindow window,
+                                               InternalWindowContext ctx,
+                                               Iterable<Integer> input,
+                                               Collector<Void> out) throws 
Exception {
+                                       Integer inputValue = 
input.iterator().next();
+
+                                       ctx.output(integerOutputTag, 
inputValue);
+                                       ctx.output(longOutputTag, 
inputValue.longValue());
+                               }
+
+                               @Override
+                               public void clear(
+                                       TimeWindow window,
+                                       InternalWindowContext context) throws 
Exception {}
+                       };
+
+               OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+                       createWindowOperator(mockAssigner, mockTrigger, 0L, 
windowFunction);
+
+               testHarness.open();
+
+               final long windowEnd = 42L;
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                       .thenReturn(Collections.singletonList(new TimeWindow(0, 
windowEnd)));
+
+               shouldFireOnElement(mockTrigger);
+
+               testHarness.processElement(new StreamRecord<>(17, 5L));
+
+               assertThat(testHarness.getSideOutput(integerOutputTag),
+                       contains(isStreamRecord(17, windowEnd - 1)));
+
+               assertThat(testHarness.getSideOutput(longOutputTag),
+                       contains(isStreamRecord(17L, windowEnd - 1)));
+       }
+
        @Test
        public void testAssignerIsInvokedOncePerElement() throws Exception {
 

Reply via email to