http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 104bc7b..a9c3ef6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; @@ -59,7 +60,9 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; @@ -432,6 +435,78 @@ public class WindowOperatorTest extends TestLogger { @Test @SuppressWarnings("unchecked") + public void testSessionWindowsWithProcessFunction() throws Exception { + closeCalled.set(0); + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableProcessWindowFunction<>(new SessionProcessWindowFunction()), + EventTimeTrigger.create(), + 0); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); + + // do a snapshot, close and restore again + OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + testHarness.close(); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050)); + + testHarness.processWatermark(new Watermark(12000)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049)); + expectedOutput.add(new Watermark(12000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000)); + + testHarness.processWatermark(new Watermark(17999)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999)); + expectedOutput.add(new Watermark(17999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + + @Test + @SuppressWarnings("unchecked") public void testReduceSessionWindows() throws Exception { closeCalled.set(0); @@ -500,6 +575,76 @@ public class WindowOperatorTest extends TestLogger { testHarness.close(); } + @Test + @SuppressWarnings("unchecked") + public void testReduceSessionWindowsWithProcessFunction() throws Exception { + closeCalled.set(0); + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>( + "window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()), + EventTimeTrigger.create(), + 0); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + + // do a snapshot, close and restore again + OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + testHarness.close(); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050)); + + testHarness.processWatermark(new Watermark(12000)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049)); + expectedOutput.add(new Watermark(12000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000)); + + testHarness.processWatermark(new Watermark(17999)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999)); + expectedOutput.add(new Watermark(17999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + /** * This tests whether merging works correctly with the CountTrigger. * @throws Exception @@ -2379,6 +2524,38 @@ public class WindowOperatorTest extends TestLogger { } } + public static class SessionProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void process(String key, + Context context, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, Long, Long>> out) throws Exception { + int sum = 0; + for (Tuple2<String, Integer> i: values) { + sum += i.f1; + } + String resultString = key + "-" + sum; + TimeWindow window = context.window(); + out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd())); + } + } + + public static class ReducedProcessSessionWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void process(String key, + Context context, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, Long, Long>> out) throws Exception { + TimeWindow window = context.window(); + for (Tuple2<String, Integer> val: values) { + out.collect(new Tuple3<>(key + "-" + val.f1, window.getStart(), window.getEnd())); + } + } + } public static class PointSessionWindows extends EventTimeSessionWindows { private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java index 1e3e3d5..7d37d1a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java @@ -18,17 +18,22 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; @@ -114,6 +119,79 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase { } @Test + public void testFoldProcessWindow() throws Exception { + + testResults = new ArrayList<>(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + ctx.collect(Tuple2.of("a", 2)); + + ctx.collect(Tuple2.of("b", 3)); + ctx.collect(Tuple2.of("b", 4)); + ctx.collect(Tuple2.of("b", 5)); + + ctx.collect(Tuple2.of("a", 6)); + ctx.collect(Tuple2.of("a", 7)); + ctx.collect(Tuple2.of("a", 8)); + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + @Override + public void cancel() {} + + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); + + source1 + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .fold(Tuple2.of(0, "R:"), new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() { + @Override + public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception { + accumulator.f1 += value.f0; + accumulator.f0 += value.f1; + return accumulator; + } + }, new ProcessWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, Tuple, TimeWindow>() { + @Override + public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception { + int i = 0; + for (Tuple2<Integer, String> in : elements) { + out.collect(new Tuple3<>(in.f1, in.f0, i++)); + } + } + }) + .addSink(new SinkFunction<Tuple3<String, Integer, Integer>>() { + @Override + public void invoke(Tuple3<String, Integer, Integer> value) throws Exception { + testResults.add(value.toString()); + } + }); + + env.execute("Fold Process Window Test"); + + List<String> expectedResult = Arrays.asList( + "(R:aaa,3,0)", + "(R:aaa,21,0)", + "(R:bbb,12,0)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + @Test public void testFoldAllWindow() throws Exception { testResults = new ArrayList<>();
