Repository: flink Updated Branches: refs/heads/master 87b907736 -> 788b83921
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index b6c1618..34eac9e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; @@ -383,6 +384,119 @@ public class AllWindowTranslationTest { processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithProcessWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple3<String, String, Integer>> window = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .reduce(reducer, new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process( + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window = source + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .reduce(new DummyReducer(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process( + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor); + + processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testReduceWithEvictorAndProcessFunction() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .reduce( + reducer, + new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + @Override + public void process( + Context context, + Iterable<Tuple2<String, Integer>> elements, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : elements) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + /** * Test for the deprecated .apply(Reducer, WindowFunction). */ @@ -540,6 +654,226 @@ public class AllWindowTranslationTest { operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } + @Test + public void testAggregateWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .aggregate(new DummyAggregationFunction()); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput( + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + public void testAggregateWithEvictorAndProcessFunction() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .aggregate( + new DummyAggregationFunction(), + new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + @Override + public void process( + Context context, + Iterable<Tuple2<String, Integer>> elements, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : elements) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput( + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + // ------------------------------------------------------------------------ + // process() translation tests + // ------------------------------------------------------------------------ + + @Test + @SuppressWarnings("rawtypes") + public void testProcessEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process( + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testProcessProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process( + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + + } + + @Test + @SuppressWarnings("rawtypes") + public void testProcessWithEvictor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(1)) + .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) + .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process( + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testProcessWithCustomTrigger() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(1)) + .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process( + Context ctx, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in : values) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + // ------------------------------------------------------------------------ // fold() translation tests // ------------------------------------------------------------------------ @@ -666,6 +1000,117 @@ public class AllWindowTranslationTest { @Test @SuppressWarnings("rawtypes") + public void testFoldWithProcessAllWindowFunctionEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + @Override + public void process( + Context ctx, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") + public void testFoldWithProcessAllWindowFunctionProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple2<String, Integer>> window = source + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process( + Context ctx, + Iterable<Tuple3<String, String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : values) { + out.collect(new Tuple2<>(in.f0, in.f2)); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof WindowOperator); + WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor); + + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testFoldWithEvictorAndProcessFunction() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DataStream<Tuple3<String, String, Integer>> window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(CountEvictor.of(100)) + .fold( + new Tuple3<>("", "", 1), + new DummyFolder(), + new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { + @Override + public void process( + Context context, + Iterable<Tuple3<String, String, Integer>> elements, + Collector<Tuple3<String, String, Integer>> out) throws Exception { + for (Tuple3<String, String, Integer> in : elements) { + out.collect(in); + } + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = + (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + Assert.assertTrue(operator instanceof EvictingWindowOperator); + EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator; + Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); + Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); + + winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig()); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + } + + @Test + @SuppressWarnings("rawtypes") public void testApplyWithPreFolderEventTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index cf062fc..694353c 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -18,20 +18,19 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{PublicEvolving, Public} +import org.apache.flink.annotation.{Public, PublicEvolving} import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} -import org.apache.flink.streaming.api.scala.function.AllWindowFunction -import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaReduceFunction, ScalaFoldFunction} +import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction} +import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaFoldFunction, ScalaProcessAllWindowFunctionWrapper, ScalaReduceFunction} import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector - import org.apache.flink.util.Preconditions.checkNotNull /** @@ -199,6 +198,64 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { asScalaStream(javaStream.reduce(reducer, applyFunction, returnType)) } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def reduce[R: TypeInformation]( + preAggregator: ReduceFunction[T], + windowFunction: ProcessAllWindowFunction[T, R, W]): DataStream[R] = { + + val cleanedReducer = clean(preAggregator) + val cleanedWindowFunction = clean(windowFunction) + + val applyFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanedWindowFunction) + + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.reduce(cleanedReducer, applyFunction, returnType)) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def reduce[R: TypeInformation]( + preAggregator: (T, T) => T, + windowFunction: ProcessAllWindowFunction[T, R, W]): DataStream[R] = { + + if (preAggregator == null) { + throw new NullPointerException("Reduce function must not be null.") + } + if (windowFunction == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanReducer = clean(preAggregator) + val cleanWindowFunction = clean(windowFunction) + + val reducer = new ScalaReduceFunction[T](cleanReducer) + val applyFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanWindowFunction) + + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.reduce(reducer, applyFunction, returnType)) + } + // --------------------------- aggregate() ---------------------------------- /** @@ -257,6 +314,39 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { /** * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given aggregation function. + * + * @param preAggregator The aggregation function that is used for pre-aggregation + * @param windowFunction The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation] + (preAggregator: AggregateFunction[T, ACC, V], + windowFunction: ProcessAllWindowFunction[V, R, W]): DataStream[R] = { + + checkNotNull(preAggregator, "AggregationFunction must not be null") + checkNotNull(windowFunction, "Window function must not be null") + + val cleanedPreAggregator = clean(preAggregator) + val cleanedWindowFunction = clean(windowFunction) + + val applyFunction = new ScalaProcessAllWindowFunctionWrapper[V, R, W](cleanedWindowFunction) + + val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] + val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]] + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] + + asScalaStream(javaStream.aggregate( + cleanedPreAggregator, applyFunction, + accumulatorType, aggregationResultType, resultType)) + } + + /** + * Applies the given window function to each window. The window function is called for each * evaluation of the window. The output of the window function is * interpreted as a regular non-windowed stream. * @@ -367,6 +457,37 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * * @param initialValue Initial value of the fold * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def fold[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + preAggregator: FoldFunction[T, ACC], + windowFunction: ProcessAllWindowFunction[ACC, R, W]): DataStream[R] = { + + val cleanFolder = clean(preAggregator) + val cleanWindowFunction = clean(windowFunction) + + val applyFunction = new ScalaProcessAllWindowFunctionWrapper[ACC, R, W](cleanWindowFunction) + + asScalaStream(javaStream.fold( + initialValue, + cleanFolder, + applyFunction, + implicitly[TypeInformation[ACC]], + implicitly[TypeInformation[R]])) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation folder. + * + * @param initialValue Initial value of the fold + * @param preAggregator The reduce function that is used for pre-aggregation * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ @@ -393,6 +514,42 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, returnType)) } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation folder. + * + * @param initialValue Initial value of the fold + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def fold[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + preAggregator: (ACC, T) => ACC, + windowFunction: ProcessAllWindowFunction[ACC, R, W]): DataStream[R] = { + + if (preAggregator == null) { + throw new NullPointerException("Reduce function must not be null.") + } + if (windowFunction == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanFolder = clean(preAggregator) + val cleanWindowFunction = clean(windowFunction) + + val folder = new ScalaFoldFunction[T, ACC](cleanFolder) + val applyFunction = new ScalaProcessAllWindowFunctionWrapper[ACC, R, W](cleanWindowFunction) + + val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, returnType)) + } + // ---------------------------- apply() ------------------------------------- /** @@ -403,6 +560,27 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * Not that this function requires that all data in the windows is buffered until the window * is evaluated, as the function provides no means of pre-aggregation. * + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def process[R: TypeInformation]( + function: ProcessAllWindowFunction[T, R, W]): DataStream[R] = { + + val cleanedFunction = clean(function) + val javaFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanedFunction) + + asScalaStream(javaStream.process(javaFunction, implicitly[TypeInformation[R]])) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of pre-aggregation. + * * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala new file mode 100644 index 0000000..163117b --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function + +import java.io.Serializable + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.Function +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +/** + * Base abstract class for functions that are evaluated over keyed (grouped) + * windows using a context for retrieving extra information. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + * @tparam W The type of the window. + */ +@PublicEvolving +abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable { + /** + * Evaluates the window and outputs none or several elements. + * + * @param context The context in which the window is being evaluated. + * @param elements The elements in the window being evaluated. + * @param out A collector for emitting elements. + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + @throws[Exception] + def process(context: Context, elements: Iterable[IN], out: Collector[OUT]) + + /** + * The context holding window metadata + */ + abstract class Context { + /** + * @return The window that is being evaluated. + */ + def window: W + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala new file mode 100644 index 0000000..22d64a8 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function + +import java.beans.Transient + +import org.apache.flink.annotation.Public +import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base abstract class for functions that are evaluated over + * keyed (grouped) windows using a context for retrieving extra information. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + * @tparam W The type of the window. + */ +@Public +abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window] + extends ProcessAllWindowFunction[IN, OUT, W] + with RichFunction { + + @Transient + private var runtimeContext: RuntimeContext = null + + // -------------------------------------------------------------------------------------------- + // Runtime context access + // -------------------------------------------------------------------------------------------- + + override def setRuntimeContext(t: RuntimeContext) { + this.runtimeContext = t + } + + override def getRuntimeContext: RuntimeContext = { + if (this.runtimeContext != null) { + this.runtimeContext + } + else { + throw new IllegalStateException("The runtime context has not been initialized.") + } + } + + override def getIterationRuntimeContext: IterationRuntimeContext = { + if (this.runtimeContext == null) { + throw new IllegalStateException("The runtime context has not been initialized.") + } + else { + this.runtimeContext match { + case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext + case _ => + throw new IllegalStateException("This stub is not part of an iteration step function.") + } + } + } + + // -------------------------------------------------------------------------------------------- + // Default life cycle methods + // -------------------------------------------------------------------------------------------- + + @throws[Exception] + override def open(parameters: Configuration) { + } + + @throws[Exception] + override def close() { + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala index 23293a6..a4fec64 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala @@ -18,8 +18,16 @@ package org.apache.flink.streaming.api.scala.function.util +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction} -import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction +import org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction => JRichProcessWindowFunction} +import org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction => JRichProcessAllWindowFunction} +import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction} +import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction} +import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction} +import org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => ScalaRichProcessWindowFunction} +import org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => ScalaRichProcessAllWindowFunction} import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector @@ -34,8 +42,8 @@ import scala.collection.JavaConverters._ * - Java WindowFunction: java.lang.Iterable */ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( - private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W]) - extends JProcessWindowFunction[IN, OUT, KEY, W] { + private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W]) + extends JRichProcessWindowFunction[IN, OUT, KEY, W] { override def process( key: KEY, @@ -47,4 +55,75 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( } func.process(key, ctx, elements.asScala, out) } + + override def setRuntimeContext(t: RuntimeContext): Unit = { + super.setRuntimeContext(t) + func match { + case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t) + case _ => + } + } + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + func match { + case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters) + case _ => + } + } + + override def close(): Unit = { + super.close() + func match { + case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close() + case _ => + } + } +} + +/** + * A wrapper function that exposes a Scala ProcessWindowFunction + * as a ProcessWindowFunction function. + * + * The Scala and Java Window functions differ in their type of "Iterable": + * - Scala WindowFunction: scala.Iterable + * - Java WindowFunction: java.lang.Iterable + */ +final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window]( + private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W]) + extends JRichProcessAllWindowFunction[IN, OUT, W] { + + override def process( + context: JProcessAllWindowFunction[IN, OUT, W]#Context, + elements: java.lang.Iterable[IN], + out: Collector[OUT]): Unit = { + val ctx = new func.Context { + override def window = context.window + } + func.process(ctx, elements.asScala, out) + } + + override def setRuntimeContext(t: RuntimeContext): Unit = { + super.setRuntimeContext(t) + func match { + case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t) + case _ => + } + } + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + func match { + case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters) + case _ => + } + } + + override def close(): Unit = { + super.close() + func match { + case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.close() + case _ => + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index 7e067a0..ee9f50c 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.operators.OneInputStreamOperator -import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction} +import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction, WindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor @@ -356,6 +356,85 @@ class AllWindowTranslationTest { } @Test + def testReduceWithProcessWindowFunctionEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) + .reduce( + new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def process(context: Context, + elements: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = { + elements foreach ( x => out.collect(x)) + } + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceWithProcessWindowFunctionProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1))) + .reduce( + new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test def testApplyWithPreReducerEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -566,6 +645,72 @@ class AllWindowTranslationTest { } @Test + def testAggregateWithProcessWindowFunctionEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) + .aggregate(new DummyAggregator(), new TestProcessAllWindowFunction()) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testAggregateWithProcessWindowFunctionProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1))) + .aggregate(new DummyAggregator(), new TestProcessAllWindowFunction()) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test def testAggregateWithWindowFunctionEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -789,6 +934,88 @@ class AllWindowTranslationTest { } @Test + def testFoldWithProcessWindowFunctionEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold( + ("", "", 1), + new DummyFolder, + new ProcessAllWindowFunction[(String, String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, + input: Iterable[(String, String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldWithProcessWindowFunctionProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold( + ("", "", 1), + new DummyFolder, + new ProcessAllWindowFunction[(String, String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, + input: Iterable[(String, String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test def testApplyWithPreFolderEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -951,6 +1178,84 @@ class AllWindowTranslationTest { } @Test + def testProcessEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .process( + new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testProcessProcessingTimeTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .process( + new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test def testApplyEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -1095,6 +1400,46 @@ class AllWindowTranslationTest { } @Test + def testProcessWithCustomTrigger() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .trigger(CountTrigger.of(1)) + .process( + new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test def testReduceWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -1211,6 +1556,46 @@ class AllWindowTranslationTest { ("hello", 1)) } + @Test + def testProcessWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .process( + new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } /** * Ensure that we get some output from the given operator when pushing in an element and @@ -1218,10 +1603,10 @@ class AllWindowTranslationTest { */ @throws[Exception] private def processElementAndEnsureOutput[K, IN, OUT]( - operator: OneInputStreamOperator[IN, OUT], - keySelector: KeySelector[IN, K], - keyType: TypeInformation[K], - element: IN) { + operator: OneInputStreamOperator[IN, OUT], + keySelector: KeySelector[IN, K], + keyType: TypeInformation[K], + element: IN) { val testHarness = new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType) @@ -1243,7 +1628,8 @@ class AllWindowTranslationTest { } } -class TestAllWindowFunction extends AllWindowFunction[(String, Int), (String, Int), TimeWindow] { +class TestAllWindowFunction + extends AllWindowFunction[(String, Int), (String, Int), TimeWindow] { override def apply( window: TimeWindow, @@ -1253,3 +1639,15 @@ class TestAllWindowFunction extends AllWindowFunction[(String, Int), (String, In input.foreach(out.collect) } } + +class TestProcessAllWindowFunction + extends ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + + override def process( + context: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = { + + input.foreach(out.collect) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala index a23145c..dc38758 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time @@ -150,7 +150,6 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { } @Test - @Ignore def testFoldWithProcessWindowFunction(): Unit = { WindowFoldITCase.testResults = mutable.MutableList() CheckingIdentityRichProcessWindowFunction.reset() @@ -310,6 +309,63 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { CheckingIdentityRichAllWindowFunction.checkRichMethodCalls() } + + @Test + def testFoldAllWithProcessWindowFunction(): Unit = { + WindowFoldITCase.testResults = mutable.MutableList() + CheckingIdentityRichProcessAllWindowFunction.reset() + + val foldFunc = new FoldFunction[(String, Int), (String, Int)] { + override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = { + (accumulator._1 + value._1, accumulator._2 + value._2) + } + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("a", 3)) + ctx.collect(("b", 4)) + ctx.collect(("a", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 5)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor) + + source1 + .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .fold( + ("R:", 0), + foldFunc, + new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFoldITCase.testResults += value.toString + } + }) + + env.execute("Fold All-Window Test") + + val expectedResult = mutable.MutableList( + "(R:aaa,3)", + "(R:bababa,24)") + + assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted) + + CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls() + } } http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala index bfbe6ee..eb9f361 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala @@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time @@ -87,7 +87,6 @@ class WindowFunctionITCase { } @Test - @Ignore def testRichProcessWindowFunction(): Unit = { WindowFunctionITCase.testResults = mutable.MutableList() CheckingIdentityRichProcessWindowFunction.reset() @@ -183,6 +182,54 @@ class WindowFunctionITCase { CheckingIdentityRichAllWindowFunction.checkRichMethodCalls() } + + @Test + def testRichProcessAllWindowFunction(): Unit = { + WindowFunctionITCase.testResults = mutable.MutableList() + CheckingIdentityRichProcessAllWindowFunction.reset() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() {} + + }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor) + + source1 + .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .process(new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFunctionITCase.testResults += value.toString + } + }) + + env.execute("RichAllWindowFunction Test") + + val expectedResult = mutable.MutableList( + "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)", + "(b,3)", "(b,4)", "(b,5)") + + assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted) + + CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls() + } } object WindowFunctionITCase { http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala index 5418108..ee1dbfd 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time @@ -149,7 +149,6 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase { } @Test - @Ignore def testReduceWithProcessWindowFunction(): Unit = { WindowReduceITCase.testResults = mutable.MutableList() CheckingIdentityRichProcessWindowFunction.reset() @@ -307,6 +306,62 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase { CheckingIdentityRichAllWindowFunction.checkRichMethodCalls() } + + @Test + def testReduceAllWithProcessWindowFunction(): Unit = { + WindowReduceITCase.testResults = mutable.MutableList() + CheckingIdentityRichProcessAllWindowFunction.reset() + + val reduceFunc = new ReduceFunction[(String, Int)] { + override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = { + (a._1 + b._1, a._2 + b._2) + } + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("a", 3)) + ctx.collect(("b", 4)) + ctx.collect(("a", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 5)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor) + + source1 + .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .reduce( + reduceFunc, + new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowReduceITCase.testResults += value.toString + } + }) + + env.execute("Fold All-Window Test") + + val expectedResult = mutable.MutableList( + "(aaa,3)", + "(bababa,24)") + + assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted) + + CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls() + } } object WindowReduceITCase { http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala new file mode 100644 index 0000000..df005fa --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.testutils + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + + +class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window] + extends RichProcessAllWindowFunction[T, T, W] { + + override def process(context: Context, input: Iterable[T], out: Collector[T]): Unit = { + for (value <- input) { + out.collect(value) + } + } + + override def open(conf: Configuration): Unit = { + super.open(conf) + CheckingIdentityRichProcessAllWindowFunction.openCalled = true + } + + override def close(): Unit = { + super.close() + CheckingIdentityRichProcessAllWindowFunction.closeCalled = true + } + + override def setRuntimeContext(context: RuntimeContext): Unit = { + super.setRuntimeContext(context) + CheckingIdentityRichProcessAllWindowFunction.contextSet = true + } +} + +object CheckingIdentityRichProcessAllWindowFunction { + + @volatile + private[CheckingIdentityRichProcessAllWindowFunction] var closeCalled = false + + @volatile + private[CheckingIdentityRichProcessAllWindowFunction] var openCalled = false + + @volatile + private[CheckingIdentityRichProcessAllWindowFunction] var contextSet = false + + def reset(): Unit = { + closeCalled = false + openCalled = false + contextSet = false + } + + def checkRichMethodCalls(): Unit = { + if (!contextSet) { + throw new AssertionError("context not set") + } + if (!openCalled) { + throw new AssertionError("open() not called") + } + if (!closeCalled) { + throw new AssertionError("close() not called") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java index 7d37d1a..903179d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; @@ -253,6 +254,78 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase { Assert.assertEquals(expectedResult, testResults); } + @Test + public void testFoldProcessAllWindow() throws Exception { + + testResults = new ArrayList<>(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + ctx.collect(Tuple2.of("a", 2)); + + ctx.collect(Tuple2.of("b", 3)); + ctx.collect(Tuple2.of("b", 4)); + ctx.collect(Tuple2.of("b", 5)); + + ctx.collect(Tuple2.of("a", 6)); + ctx.collect(Tuple2.of("a", 7)); + ctx.collect(Tuple2.of("a", 8)); + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + @Override + public void cancel() {} + + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); + + source1 + .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .fold(Tuple2.of(0, "R:"), new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() { + @Override + public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception { + accumulator.f1 += value.f0; + accumulator.f0 += value.f1; + return accumulator; + } + }, new ProcessAllWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, TimeWindow>() { + @Override + public void process(Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception { + int i = 0; + for (Tuple2<Integer, String> in : elements) { + out.collect(new Tuple3<>(in.f1, in.f0, i++)); + } + } + }) + .addSink(new SinkFunction<Tuple3<String, Integer, Integer>>() { + @Override + public void invoke(Tuple3<String, Integer, Integer> value) throws Exception { + testResults.add(value.toString()); + } + }); + + env.execute("Fold Process Window Test"); + + List<String> expectedResult = Arrays.asList( + "(R:aaa,3,0)", + "(R:aaa,21,0)", + "(R:bbb,12,0)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> { @Override
