[FLINK-5237] Consolidate and harmonize Window Translation Tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5368a7d3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5368a7d3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5368a7d3 Branch: refs/heads/master Commit: 5368a7d32d96beb1b8298b87d9ea6d42ea306947 Parents: fe2a301 Author: Aljoscha Krettek <[email protected]> Authored: Thu Nov 24 08:14:48 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Feb 17 17:15:51 2017 +0100 ---------------------------------------------------------------------- .../operators/StateDescriptorPassingTest.java | 26 + .../windowing/WindowTranslationTest.java | 718 +++++++++++++++-- .../ScalaProcessWindowFunctionWrapper.scala | 16 +- .../api/scala/WindowTranslationTest.scala | 766 +++++++++++++++++-- 4 files changed, 1420 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java index 26cb7ac..813ca96 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.time.Time; @@ -136,6 +137,31 @@ public class StateDescriptorPassingTest { } @Test + public void testProcessWindowState() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); + + DataStream<File> src = env.fromElements(new File("/")); + + SingleOutputStreamOperator<?> result = src + .keyBy(new KeySelector<File, String>() { + @Override + public String getKey(File value) { + return null; + } + }) + .timeWindow(Time.milliseconds(1000)) + .process(new ProcessWindowFunction<File, String, String, TimeWindow>() { + @Override + public void process(String s, Context ctx, + Iterable<File> input, Collector<String> out) {} + }); + + validateListStateDescriptorConfigured(result); + } + + @Test public void testFoldWindowAllState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index f72a2f1..b899948 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -31,14 +31,17 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; @@ -112,7 +115,7 @@ public class WindowTranslationTest { * in a {@code AggregatingState}. */ @Test(expected = UnsupportedOperationException.class) - public void testAgrgegateWithRichFunctionFails() throws Exception { + public void testAggregateWithRichFunctionFails() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); @@ -405,6 +408,82 @@ public class WindowTranslationTest { processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithProcesWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .reduce(reducer, new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process(String key, + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process(String tuple, + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + /** * Test for the deprecated .apply(Reducer, WindowFunction). */ @@ -447,6 +526,50 @@ public class WindowTranslationTest { processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } + /** + * Test for the deprecated .apply(Reducer, WindowFunction). + */ + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithPreReducerAndEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .evictor(CountEvictor.of(100)) + .apply(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + // ------------------------------------------------------------------------ // Aggregate Translation Tests // ------------------------------------------------------------------------ @@ -463,13 +586,13 @@ public class WindowTranslationTest { .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .aggregate(new DummyAggregationFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); @@ -492,13 +615,13 @@ public class WindowTranslationTest { .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .aggregate(new DummyAggregationFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); @@ -529,7 +652,7 @@ public class WindowTranslationTest { OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); @@ -569,6 +692,66 @@ public class WindowTranslationTest { operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } + @Test + public void testAggregateWithProcessWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); + + processElementAndEnsureOutput( + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); + + processElementAndEnsureOutput( + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + // ------------------------------------------------------------------------ // Fold Translation Tests // ------------------------------------------------------------------------ @@ -664,83 +847,269 @@ public class WindowTranslationTest { @SuppressWarnings("rawtypes") public void testFoldWithWindowFunctionProcessingTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithProcessWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process(String key, + Context ctx, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process(String key, + Context ctx, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithPreFolderEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in. f1, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithPreFolderAndEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .evictor(CountEvictor.of(100)) + .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in. f1, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + + // ------------------------------------------------------------------------ + // Apply Translation Tests + // ------------------------------------------------------------------------ + + @Test + @SuppressWarnings("rawtypes") + public void testApplyEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - DataStream<Tuple2<String, Integer>> window = source + DataStream<Tuple2<String, Integer>> window1 = source .keyBy(new TupleKeySelector()) - .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { private static final long serialVersionUID = 1L; @Override public void apply(String key, TimeWindow window, - Iterable<Tuple3<String, String, Integer>> values, + Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for (Tuple3<String, String, Integer> in : values) { - out.collect(new Tuple2<>(in.f0, in.f2)); + for (Tuple2<String, Integer> in : values) { + out.collect(in); } } }); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; - Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); - Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); - Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } @Test @SuppressWarnings("rawtypes") - public void testApplyWithPreFolderEventTime() throws Exception { + public void testApplyProcessingTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - DataStream<Tuple3<String, String, Integer>> window = source + DataStream<Tuple2<String, Integer>> window1 = source .keyBy(new TupleKeySelector()) - .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { private static final long serialVersionUID = 1L; @Override public void apply(String key, TimeWindow window, - Iterable<Tuple3<String, String, Integer>> values, - Collector<Tuple3<String, String, Integer>> out) throws Exception { - for (Tuple3<String, String, Integer> in : values) { - out.collect(new Tuple3<>(in.f0, in. f1, in.f2)); + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); } } }); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator(); + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; - Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); - } - // ------------------------------------------------------------------------ - // Apply Translation Tests - // ------------------------------------------------------------------------ + } @Test @SuppressWarnings("rawtypes") - public void testApplyEventTime() throws Exception { + public void testProcessEventTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); @@ -749,12 +1118,12 @@ public class WindowTranslationTest { DataStream<Tuple2<String, Integer>> window1 = source .keyBy(new TupleKeySelector()) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { private static final long serialVersionUID = 1L; @Override - public void apply(String key, - TimeWindow window, + public void process(String key, + Context ctx, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { for (Tuple2<String, Integer> in : values) { @@ -776,7 +1145,7 @@ public class WindowTranslationTest { @Test @SuppressWarnings("rawtypes") - public void testApplyProcessingTimeTime() throws Exception { + public void testProcessProcessingTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); @@ -785,12 +1154,12 @@ public class WindowTranslationTest { DataStream<Tuple2<String, Integer>> window1 = source .keyBy(new TupleKeySelector()) .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { private static final long serialVersionUID = 1L; @Override - public void apply(String key, - TimeWindow window, + public void process(String key, + Context ctx, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { for (Tuple2<String, Integer> in : values) { @@ -903,6 +1272,43 @@ public class WindowTranslationTest { @Test @SuppressWarnings("rawtypes") + public void testProcessWithCustomTrigger() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(1)) + .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process(String key, + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") public void testReduceWithEvictor() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); @@ -930,6 +1336,121 @@ public class WindowTranslationTest { } @Test + @SuppressWarnings("rawtypes") + public void testReduceWithEvictorAndProcessFunction() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .reduce( + reducer, + new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + @Override + public void process( + Tuple tuple, + Context context, + Iterable<Tuple2<String, Integer>> elements, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : elements) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + public void testAggregateWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .aggregate(new DummyAggregationFunction()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput( + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + public void testAggregateWithEvictorAndProcessFunction() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .aggregate( + new DummyAggregationFunction(), + new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + @Override + public void process( + String s, + Context context, + Iterable<Tuple2<String, Integer>> elements, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : elements) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput( + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + + @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void testFoldWithEvictor() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -958,6 +1479,48 @@ public class WindowTranslationTest { } @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testFoldWithEvictorAndProcessFunction() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window1 = source + .keyBy(0) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .fold( + new Tuple3<>("", "", 1), + new DummyFolder(), + new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, Tuple, TimeWindow>() { + @Override + public void process( + Tuple tuple, + Context context, + Iterable<Tuple3<String, String, Integer>> elements, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : elements) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig()); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test @SuppressWarnings("rawtypes") public void testApplyWithEvictor() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -996,6 +1559,45 @@ public class WindowTranslationTest { processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } + @Test + @SuppressWarnings("rawtypes") + public void testProcessWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(1)) + .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) + .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process(String key, + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + /** * Ensure that we get some output from the given operator when pushing in an element and * setting watermark and processing time to {@code Long.MAX_VALUE}. @@ -1012,6 +1614,12 @@ public class WindowTranslationTest { keySelector, keyType); + if (operator instanceof OutputTypeConfigurable) { + // use a dummy type since window functions just need the ExecutionConfig + // this is also only needed for Fold, which we're getting rid off soon. + ((OutputTypeConfigurable) operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig()); + } + testHarness.open(); testHarness.setProcessingTime(0); @@ -1050,7 +1658,7 @@ public class WindowTranslationTest { } } - private static class DummyAggregationFunction + private static class DummyAggregationFunction implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> { @Override @@ -1096,7 +1704,7 @@ public class WindowTranslationTest { } } - private static class TestWindowFunction + private static class TestWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> { @Override @@ -1111,6 +1719,22 @@ public class WindowTranslationTest { } } + private static class TestProcessWindowFunction + extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> { + + @Override + public void process(String key, + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + } + + private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala index 4a20371..23293a6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala @@ -18,8 +18,6 @@ package org.apache.flink.streaming.api.scala.function.util -import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext} -import org.apache.flink.api.java.operators.translation.WrappingFunction import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction} import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window @@ -37,8 +35,7 @@ import scala.collection.JavaConverters._ */ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W]) - extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func) - with JProcessWindowFunctionTrait[IN, OUT, KEY, W] { + extends JProcessWindowFunction[IN, OUT, KEY, W] { override def process( key: KEY, @@ -50,15 +47,4 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( } func.process(key, ctx, elements.asScala, out) } - - override def getRuntimeContext: RuntimeContext = { - throw new RuntimeException("This should never be called") - } - - override def getIterationRuntimeContext: IterationRuntimeContext = { - throw new RuntimeException("This should never be called") - } } - -private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W] - extends JProcessWindowFunction[IN, OUT, KEY, W]
