http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 66de849..492d275 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,46 +17,59 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFoldFunction; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.WindowedStream; -import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** - * These tests verify that the api calls on - * {@link WindowedStream} instantiate - * the correct window operator. + * These tests verify that the api calls on {@link WindowedStream} instantiate the correct + * window operator. + * + * <p>We also create a test harness and push one element into the operator to verify + * that we get some output. */ @SuppressWarnings("serial") public class WindowTranslationTest { @@ -66,13 +79,13 @@ public class WindowTranslationTest { * in a {@code ReducingState}. */ @Test(expected = UnsupportedOperationException.class) - public void testReduceFailWithRichReducer() throws Exception { + public void testReduceWithRichReducerFails() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream<Tuple2<String, Integer>> window1 = source + source .keyBy(0) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduce(new RichReduceFunction<Tuple2<String, Integer>>() { @@ -84,163 +97,43 @@ public class WindowTranslationTest { return null; } }); + + fail("exception was not thrown"); } /** - * These tests ensure that the correct trigger is set when using event-time windows. + * .fold() does not support RichFoldFunction, since the fold function is used internally + * in a {@code FoldingState}. */ - @Test - @SuppressWarnings("rawtypes") - public void testEventTime() throws Exception { + @Test(expected = UnsupportedOperationException.class) + public void testFoldWithRichFolderFails() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - - DummyReducer reducer = new DummyReducer(); - - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(0) - .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .reduce(reducer); - - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); - Assert.assertTrue(operator1 instanceof WindowOperator); - WindowOperator winOperator1 = (WindowOperator) operator1; - Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); - Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor); - - DataStream<Tuple2<String, Integer>> window2 = source - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { - private static final long serialVersionUID = 1L; - - @Override - public void apply(Tuple tuple, - TimeWindow window, - Iterable<Tuple2<String, Integer>> values, - Collector<Tuple2<String, Integer>> out) throws Exception { - } - }); - - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof WindowOperator); - WindowOperator winOperator2 = (WindowOperator) operator2; - Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); - } - - @Test - @SuppressWarnings("rawtypes") - public void testNonEvicting() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - - DummyReducer reducer = new DummyReducer(); - - DataStream<Tuple2<String, Integer>> window1 = source + source .keyBy(0) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .trigger(CountTrigger.of(100)) - .reduce(reducer); - - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); - Assert.assertTrue(operator1 instanceof WindowOperator); - WindowOperator winOperator1 = (WindowOperator) operator1; - Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); - Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor); - - DataStream<Tuple2<String, Integer>> window2 = source - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { - private static final long serialVersionUID = 1L; + .fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { + private static final long serialVersionUID = -6448847205314995812L; @Override - public void apply(Tuple tuple, - TimeWindow window, - Iterable<Tuple2<String, Integer>> values, - Collector<Tuple2<String, Integer>> out) throws Exception { - + public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return null; } }); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof WindowOperator); - WindowOperator winOperator2 = (WindowOperator) operator2; - Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); + fail("exception was not thrown"); } - @Test - @SuppressWarnings("rawtypes") - public void testEvicting() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - - DummyReducer reducer = new DummyReducer(); - - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(0) - .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .evictor(CountEvictor.of(100)) - .reduce(reducer); - - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); - Assert.assertTrue(operator1 instanceof EvictingWindowOperator); - EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1; - Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); - Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); - Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor); - - DataStream<Tuple2<String, Integer>> window2 = source - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { - private static final long serialVersionUID = 1L; - - @Override - public void apply(Tuple tuple, - TimeWindow window, - Iterable<Tuple2<String, Integer>> values, - Collector<Tuple2<String, Integer>> out) throws Exception { - - } - }); - - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof EvictingWindowOperator); - EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2; - Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); - Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); - } @Test - public void testSessionWithFold() throws Exception { + public void testSessionWithFoldFails() throws Exception { // verify that fold does not work with merging windows - StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao") .keyBy(new KeySelector<String, String>() { @@ -272,10 +165,10 @@ public class WindowTranslationTest { } @Test - public void testMergingAssignerWithNonMergingTrigger() throws Exception { + public void testMergingAssignerWithNonMergingTriggerFails() throws Exception { // verify that we check for trigger compatibility - StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao") .keyBy(new KeySelector<String, String>() { @@ -331,7 +224,649 @@ public class WindowTranslationTest { fail("The trigger call should fail."); } + @Test + @SuppressWarnings("rawtypes") + public void testReduceEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .reduce(new DummyReducer()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .reduce(new DummyReducer()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + + /** + * Ignored because we currently don't have the fast processing-time window operator. + */ + @Test + @SuppressWarnings("rawtypes") + @Ignore + public void testReduceFastProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .reduce(new DummyReducer()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof AggregatingProcessingTimeWindowOperator); + + processElementAndEnsureOutput(operator, null, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .reduce(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithWindowFunctionProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .reduce(new DummyReducer(), new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String tuple, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + /** + * Test for the deprecated .apply(Reducer, WindowFunction). + */ + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithPreReducerEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window1 = source + .keyBy(0) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .fold(new Tuple3<>("", "", 1), new DummyFolder()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .fold(new Tuple3<>("", "", 0), new DummyFolder()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithWindowFunctionProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithPreFolderEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in. f1, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyProcessingTimeTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithCustomTrigger() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .trigger(CountTrigger.of(1)) + .reduce(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithCustomTrigger() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window1 = source + .keyBy(0) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .trigger(CountTrigger.of(1)) + .fold(new Tuple3<>("", "", 1), new DummyFolder()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithCustomTrigger() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(1)) + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .reduce(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testFoldWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window1 = source + .keyBy(0) + .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .fold(new Tuple3<>("", "", 1), new DummyFolder()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig()); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(new TupleKeySelector()) + .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(1)) + .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) + .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + /** + * Ensure that we get some output from the given operator when pushing in an element and + * setting watermark and processing time to {@code Long.MAX_VALUE}. + */ + private static <K, IN, OUT> void processElementAndEnsureOutput( + OneInputStreamOperator<IN, OUT> operator, + KeySelector<IN, K> keySelector, + TypeInformation<K> keyType, + IN element) throws Exception { + + KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + operator, + keySelector, + keyType); + + testHarness.open(); + + testHarness.setProcessingTime(0); + testHarness.processWatermark(Long.MIN_VALUE); + + testHarness.processElement(new StreamRecord<>(element, 0)); + + // provoke any processing-time/event-time triggers + testHarness.setProcessingTime(Long.MAX_VALUE); + testHarness.processWatermark(Long.MAX_VALUE); + // we at least get the two watermarks and should also see an output element + assertTrue(testHarness.getOutput().size() >= 3); + + testHarness.close(); + } // ------------------------------------------------------------------------ // UDFs @@ -345,4 +880,23 @@ public class WindowTranslationTest { return value1; } } + + private static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> { + @Override + public Tuple3<String, String, Integer> fold( + Tuple3<String, String, Integer> accumulator, + Tuple2<String, Integer> value) throws Exception { + return accumulator; + } + } + + private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple2<String, Integer> value) throws Exception { + return value.f0; + } + } + }
http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml index b0cc961..ddc3e05 100644 --- a/flink-streaming-scala/pom.xml +++ b/flink-streaming-scala/pom.xml @@ -106,6 +106,14 @@ under the License. <type>test-jar</type> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> <build>
