[FLINK-5237] Consolidate and harmonize Window Translation Tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d3ad451 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d3ad451 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d3ad451 Branch: refs/heads/release-1.2 Commit: 8d3ad451564ce2f0a53734695e245d85ede62ae8 Parents: ac2c58c Author: Aljoscha Krettek <[email protected]> Authored: Thu Nov 24 08:14:48 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Jan 11 15:47:30 2017 +0100 ---------------------------------------------------------------------- .../windowing/AllWindowTranslationTest.java | 841 ++++++++++--- .../windowing/TimeWindowTranslationTest.java | 112 +- .../windowing/WindowTranslationTest.java | 844 ++++++++++--- flink-streaming-scala/pom.xml | 8 + .../api/scala/AllWindowTranslationTest.scala | 1086 ++++++++++++++--- .../api/scala/TimeWindowTranslationTest.scala | 182 +++ .../api/scala/WindowTranslationTest.scala | 1131 +++++++++++++++--- 7 files changed, 3561 insertions(+), 643 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 72b0850..3d4de5d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,121 +17,548 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFoldFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** - * These tests verify that the api calls on - * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate - * the correct window operator. + * These tests verify that the api calls on {@link AllWindowedStream} instantiate the correct + * window operator. + * + * <p>We also create a test harness and push one element into the operator to verify + * that we get some output. */ +@SuppressWarnings("serial") public class AllWindowTranslationTest { /** - * These tests ensure that the correct trigger is set when using event-time windows. + * .reduce() does not support RichReduceFunction, since the reduce function is used internally + * in a {@code ReducingState}. */ - @Test - @SuppressWarnings("rawtypes") - public void testEventTime() throws Exception { + @Test(expected = UnsupportedOperationException.class) + public void testReduceWithRichReducerFails() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + source + .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .reduce(new RichReduceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = -6448847205314995812L; + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return null; + } + }); + + fail("exception was not thrown"); + } + + /** + * .fold() does not support RichFoldFunction, since the fold function is used internally + * in a {@code FoldingState}. + */ + @Test(expected = UnsupportedOperationException.class) + public void testFoldWithRichFolderFails() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + source + .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { + private static final long serialVersionUID = -6448847205314995812L; + + @Override + public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return null; + } + }); + + fail("exception was not thrown"); + } + + + @Test + public void testSessionWithFoldFails() throws Exception { + // verify that fold does not work with merging windows + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao") + .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))); + + try { + windowedStream.fold("", new FoldFunction<String, String>() { + private static final long serialVersionUID = -4567902917104921706L; + + @Override + public String fold(String accumulator, String value) throws Exception { + return accumulator; + } + }); + } catch (UnsupportedOperationException e) { + // expected + // use a catch to ensure that the exception is thrown by the fold + return; + } + + fail("The fold call should fail."); + } + + @Test + public void testMergingAssignerWithNonMergingTriggerFails() throws Exception { + // verify that we check for trigger compatibility + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao") + .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))); + + try { + windowedStream.trigger(new Trigger<String, TimeWindow>() { + private static final long serialVersionUID = 6558046711583024443L; + + @Override + public TriggerResult onElement(String element, + long timestamp, + TimeWindow window, + TriggerContext ctx) throws Exception { + return null; + } + + @Override + public TriggerResult onProcessingTime(long time, + TimeWindow window, + TriggerContext ctx) throws Exception { + return null; + } + + @Override + public TriggerResult onEventTime(long time, + TimeWindow window, + TriggerContext ctx) throws Exception { + return null; + } + + @Override + public boolean canMerge() { + return false; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} + }); + } catch (UnsupportedOperationException e) { + // expected + // use a catch to ensure that the exception is thrown by the fold + return; + } + + fail("The trigger call should fail."); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DummyReducer reducer = new DummyReducer(); + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); DataStream<Tuple2<String, Integer>> window1 = source .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .reduce(reducer); + .reduce(new DummyReducer()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .reduce(new DummyReducer()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + + /** + * Ignored because we currently don't have the fast processing-time window operator. + */ + @Test + @SuppressWarnings("rawtypes") + @Ignore + public void testReduceFastProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .reduce(new DummyReducer()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof AggregatingProcessingTimeWindowOperator); + + processElementAndEnsureOutput(operator, null, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); - Assert.assertTrue(operator1 instanceof WindowOperator); - WindowOperator winOperator1 = (WindowOperator) operator1; - Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); - Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor); + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); - DataStream<Tuple2<String, Integer>> window2 = source + DataStream<Tuple3<String, String, Integer>> window = source .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + .reduce(reducer, new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override public void apply( TimeWindow window, Iterable<Tuple2<String, Integer>> values, - Collector<Tuple2<String, Integer>> out) throws Exception { - + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } } }); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof WindowOperator); - WindowOperator winOperator2 = (WindowOperator) operator2; - Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } @Test @SuppressWarnings("rawtypes") - public void testNonEvicting() throws Exception { + public void testReduceWithWindowFunctionProcessingTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> window = source + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .reduce(new DummyReducer(), new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply( + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + /** + * Test for the deprecated .apply(Reducer, WindowFunction). + */ + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithPreReducerEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DummyReducer reducer = new DummyReducer(); - DataStream<Tuple2<String, Integer>> window1 = source + DataStream<Tuple3<String, String, Integer>> window = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(reducer, new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply( + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window1 = source .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .trigger(CountTrigger.of(100)) - .reduce(reducer); + .fold(new Tuple3<>("", "", 1), new DummyFolder()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .fold(new Tuple3<>("", "", 0), new DummyFolder()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); - Assert.assertTrue(operator1 instanceof WindowOperator); - WindowOperator winOperator1 = (WindowOperator) operator1; - Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); - Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor); + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); - DataStream<Tuple2<String, Integer>> window2 = source + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply( + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithWindowFunctionProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply( + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithPreFolderEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new Tuple3<>("", "", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply( + TimeWindow window, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f1, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + + @Test + @SuppressWarnings("rawtypes") + public void testApplyEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -140,24 +567,65 @@ public class AllWindowTranslationTest { TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyProcessingTimeTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + @Override + public void apply( + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } } }); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof WindowOperator); - WindowOperator winOperator2 = (WindowOperator) operator2; - Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + @Test @SuppressWarnings("rawtypes") - public void testEvicting() throws Exception { + public void testReduceWithCustomTrigger() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); @@ -165,22 +633,56 @@ public class AllWindowTranslationTest { DataStream<Tuple2<String, Integer>> window1 = source .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .evictor(CountEvictor.of(100)) + .trigger(CountTrigger.of(1)) .reduce(reducer); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); - Assert.assertTrue(operator1 instanceof EvictingWindowOperator); - EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1; - Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); - Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); - Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor); + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } - DataStream<Tuple2<String, Integer>> window2 = source + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithCustomTrigger() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .trigger(CountTrigger.of(1)) + .fold(new Tuple3<>("", "", 1), new DummyFolder()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithCustomTrigger() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) + .trigger(CountTrigger.of(1)) .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -189,136 +691,146 @@ public class AllWindowTranslationTest { TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { - + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } } }); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof EvictingWindowOperator); - EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2; - Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); - Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } - /** - * These tests ensure that a Fold buffer is used if possible - */ @Test @SuppressWarnings("rawtypes") - public void testFoldBuffer() throws Exception { + public void testReduceWithEvictor() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DummyFolder folder = new DummyFolder(); + DummyReducer reducer = new DummyReducer(); - DataStream<Integer> window1 = source + DataStream<Tuple2<String, Integer>> window1 = source .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .fold(0, folder); + .evictor(CountEvictor.of(100)) + .reduce(reducer); - OneInputTransformation<Tuple2<String, Integer>, Integer> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator1 = transform1.getOperator(); - Assert.assertTrue(operator1 instanceof WindowOperator); - WindowOperator winOperator1 = (WindowOperator) operator1; - Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); - Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor); + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); - DataStream<Integer> window2 = source - .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .evictor(CountEvictor.of(13)) - .fold(0, folder); - - OneInputTransformation<Tuple2<String, Integer>, Integer> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof WindowOperator); - WindowOperator winOperator2 = (WindowOperator) operator2; - Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } @Test - public void testSessionWithFold() throws Exception { - // verify that fold does not work with merging windows + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testFoldWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment(); + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao") - .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))); + DataStream<Tuple3<String, String, Integer>> window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .fold(new Tuple3<>("", "", 1), new DummyFolder()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig()); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } - try { - windowedStream.fold("", new FoldFunction<String, String>() { - private static final long serialVersionUID = -8722899157560218917L; + @Test + @SuppressWarnings("rawtypes") + public void testApplyWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - @Override - public String fold(String accumulator, String value) throws Exception { - return accumulator; - } - }); - } catch (UnsupportedOperationException e) { - // expected - // use a catch to ensure that the exception is thrown by the fold - return; - } + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - fail("The fold call should fail."); - } + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(1)) + .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) + .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; - @Test - public void testMergingAssignerWithNonMergingTrigger() throws Exception { - // verify that we check for trigger compatibility + @Override + public void apply( + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); - StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment(); + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); - AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao") - .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } - try { - windowedStream.trigger(new Trigger<String, TimeWindow>() { - private static final long serialVersionUID = 8360971631424870421L; + /** + * Ensure that we get some output from the given operator when pushing in an element and + * setting watermark and processing time to {@code Long.MAX_VALUE}. + */ + private static <K, IN, OUT> void processElementAndEnsureOutput( + OneInputStreamOperator<IN, OUT> operator, + KeySelector<IN, K> keySelector, + TypeInformation<K> keyType, + IN element) throws Exception { - @Override - public TriggerResult onElement(String element, - long timestamp, - TimeWindow window, - TriggerContext ctx) throws Exception { - return null; - } + KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + operator, + keySelector, + keyType); - @Override - public TriggerResult onProcessingTime(long time, - TimeWindow window, - TriggerContext ctx) throws Exception { - return null; - } + testHarness.open(); - @Override - public TriggerResult onEventTime(long time, - TimeWindow window, - TriggerContext ctx) throws Exception { - return null; - } + testHarness.setProcessingTime(0); + testHarness.processWatermark(Long.MIN_VALUE); - @Override - public boolean canMerge() { - return false; - } + testHarness.processElement(new StreamRecord<>(element, 0)); - @Override - public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} - }); - } catch (UnsupportedOperationException e) { - // expected - // use a catch to ensure that the exception is thrown by the fold - return; - } + // provoke any processing-time/event-time triggers + testHarness.setProcessingTime(Long.MAX_VALUE); + testHarness.processWatermark(Long.MAX_VALUE); - fail("The trigger call should fail."); + // we at least get the two watermarks and should also see an output element + assertTrue(testHarness.getOutput().size() >= 3); + + testHarness.close(); } // ------------------------------------------------------------------------ @@ -334,13 +846,12 @@ public class AllWindowTranslationTest { } } - public static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Integer> { - private static final long serialVersionUID = 1L; - + private static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> { @Override - public Integer fold(Integer accumulator, Tuple2<String, Integer> value) throws Exception { + public Tuple3<String, String, Integer> fold( + Tuple3<String, String, Integer> accumulator, + Tuple2<String, Integer> value) throws Exception { return accumulator; } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index 5aa8151..8e37021 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,7 +17,9 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.java.tuple.Tuple; @@ -44,9 +46,8 @@ import org.junit.Test; import java.util.concurrent.TimeUnit; /** - * These tests verify that the api calls on - * {@link WindowedStream} instantiate - * the correct window operator. + * These tests verify that the api calls on {@link WindowedStream} that use the "time" shortcut + * instantiate the correct window operator. */ public class TimeWindowTranslationTest { @@ -56,8 +57,9 @@ public class TimeWindowTranslationTest { */ @Test @Ignore - public void testFastTimeWindows() throws Exception { + public void testReduceFastTimeWindows() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); @@ -71,8 +73,21 @@ public class TimeWindowTranslationTest { OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator); + } - DataStream<Tuple2<String, Integer>> window2 = source + /** + * These tests ensure that the fast aligned time windows operator is used if the + * conditions are right. + */ + @Test + @Ignore + public void testApplyFastTimeWindows() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source .keyBy(0) .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { @@ -87,25 +102,25 @@ public class TimeWindowTranslationTest { } }); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator); + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof AccumulatingProcessingTimeWindowOperator); } @Test @SuppressWarnings("rawtypes") - public void testEventTimeWindows() throws Exception { + public void testReduceEventTimeWindows() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - - DummyReducer reducer = new DummyReducer(); + DataStream<Tuple2<String, Integer>> source = env.fromElements( + Tuple2.of("hello", 1), + Tuple2.of("hello", 2)); DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(0) - .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS)) - .reduce(reducer); + .keyBy(0) + .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS)) + .reduce(new DummyReducer()); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); @@ -114,8 +129,43 @@ public class TimeWindowTranslationTest { Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor); + } - DataStream<Tuple2<String, Integer>> window2 = source + @Test + @SuppressWarnings("rawtypes") + public void testFoldEventTimeWindows() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements( + Tuple2.of("hello", 1), + Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS)) + .fold(new Tuple2<>("", 1), new DummyFolder()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof WindowOperator); + WindowOperator winOperator1 = (WindowOperator) operator1; + Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor); + } + + @Test + @SuppressWarnings("rawtypes") + public void testApplyEventTimeWindows() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements( + Tuple2.of("hello", 1), + Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source .keyBy(0) .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { @@ -130,14 +180,13 @@ public class TimeWindowTranslationTest { } }); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); - Assert.assertTrue(operator2 instanceof WindowOperator); - WindowOperator winOperator2 = (WindowOperator) operator2; - Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); - Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows); - Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); - + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof WindowOperator); + WindowOperator winOperator1 = (WindowOperator) operator1; + Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor); } /** @@ -187,7 +236,7 @@ public class TimeWindowTranslationTest { // UDFs // ------------------------------------------------------------------------ - public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override @@ -195,4 +244,15 @@ public class TimeWindowTranslationTest { return value1; } } + + private static class DummyFolder + implements FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { + return value1; + } + } + }
