[FLINK-4997] [streaming] Add ProcessWindowFunction to Scala API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86dff0e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86dff0e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86dff0e6 Branch: refs/heads/master Commit: 86dff0e6d584027994dd1320845169cc8b1a83d5 Parents: 1dcb2dc Author: Ventura Del Monte <[email protected]> Authored: Wed Nov 9 10:49:47 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Feb 17 17:15:51 2017 +0100 ---------------------------------------------------------------------- .../streaming/api/scala/WindowedStream.scala | 144 ++++++++++++++++++- .../scala/function/ProcessWindowFunction.scala | 61 ++++++++ .../function/RichProcessWindowFunction.scala | 87 +++++++++++ .../ScalaProcessWindowFunctionWrapper.scala | 64 +++++++++ .../streaming/api/scala/WindowFoldITCase.scala | 64 ++++++++- .../api/scala/WindowFunctionITCase.scala | 54 ++++++- .../api/scala/WindowReduceITCase.scala | 64 ++++++++- ...ckingIdentityRichProcessWindowFunction.scala | 81 +++++++++++ 8 files changed, 605 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index ab27820..96ff334 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -18,14 +18,16 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.annotation.{Public, PublicEvolving} +import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction} import org.apache.flink.annotation.{PublicEvolving, Public} import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} -import org.apache.flink.streaming.api.scala.function.WindowFunction -import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper} +import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.scala.function.util._ import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.Trigger @@ -99,7 +101,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { // ------------------------------------------------------------------------ // --------------------------- reduce() ----------------------------------- - + /** * Applies a reduce function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the reduce function is interpreted @@ -198,10 +200,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { asScalaStream(javaStream.reduce(reducer, applyFunction, implicitly[TypeInformation[R]])) } + + /** + * Applies the given reduce function to each window. The window reduced value is + * then passed as input of the window function. The output of the window function + * is interpreted as a regular non-windowed stream. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def reduce[R: TypeInformation]( + preAggregator: (T, T) => T, + function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = { + + val cleanedPreAggregator = clean(preAggregator) + val cleanedWindowFunction = clean(function) + + val reducer = new ScalaReduceFunction[T](cleanedPreAggregator) + val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction) + + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.reduce(reducer, applyFunction, resultType)) + } + + /** + * Applies the given reduce function to each window. The window reduced value is + * then passed as input of the window function. The output of the window function + * is interpreted as a regular non-windowed stream. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def reduce[R: TypeInformation]( + preAggregator: ReduceFunction[T], + function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = { + + val cleanedPreAggregator = clean(preAggregator) + val cleanedWindowFunction = clean(function) + + val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction) + + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.reduce(cleanedPreAggregator, applyFunction, resultType)) + } + // -------------------------- aggregate() --------------------------------- /** - * Applies the given aggregation function to each window and key. The aggregation function + * Applies the given aggregation function to each window and key. The aggregation function * is called for each element, aggregating values incrementally and keeping the state to * one accumulator per key and window. * @@ -213,7 +263,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] - + asScalaStream(javaStream.aggregate( clean(aggregateFunction), accumulatorType, resultType)) } @@ -241,7 +291,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]] val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] - + asScalaStream(javaStream.aggregate( cleanedPreAggregator, applyFunction, accumulatorType, aggregationResultType, resultType)) @@ -277,7 +327,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } // ---------------------------- fold() ------------------------------------ - + /** * Applies the given fold function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the reduce function is @@ -379,9 +429,89 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, resultType)) } + + /** + * Applies the given fold function to each window. The window folded value is + * then passed as input of the process window function. + * The output of the process window function is interpreted as a regular non-windowed stream. + * + * @param initialValue The initial value of the fold + * @param foldFunction The fold function that is used for incremental aggregation + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def fold[R: TypeInformation, ACC: TypeInformation]( + initialValue: ACC, + foldFunction: (ACC, T) => ACC, + function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = { + + val cleanedFunction = clean(function) + val cleanedFoldFunction = clean(foldFunction) + + val folder = new ScalaFoldFunction[T, ACC](cleanedFoldFunction) + val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction) + + asScalaStream(javaStream.fold( + initialValue, + folder, + applyFunction, + implicitly[TypeInformation[ACC]], + implicitly[TypeInformation[R]])) + } + + /** + * Applies the given fold function to each window. The window folded value is + * then passed as input of the process window function. + * The output of the process window function is interpreted as a regular non-windowed stream. + * + * @param initialValue The initial value of the fold + * @param foldFunction The fold function that is used for incremental aggregation + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def fold[R: TypeInformation, ACC: TypeInformation]( + initialValue: ACC, + foldFunction: FoldFunction[T, ACC], + function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = { + + val cleanedFunction = clean(function) + val cleanedFoldFunction = clean(foldFunction) + + val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction) + + asScalaStream(javaStream.fold( + initialValue, + cleanedFoldFunction, + applyFunction, + implicitly[TypeInformation[ACC]], + implicitly[TypeInformation[R]])) + } + // ---------------------------- apply() ------------------------------------- /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of pre-aggregation. + * + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def process[R: TypeInformation]( + function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = { + + val cleanFunction = clean(function) + val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanFunction) + asScalaStream(javaStream.process(applyFunction, implicitly[TypeInformation[R]])) + } + + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala new file mode 100644 index 0000000..79f3918 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function + +import java.io.Serializable + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.Function +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +/** + * Base abstract class for functions that are evaluated over keyed (grouped) + * windows using a context for retrieving extra information. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + * @tparam KEY The type of the key. + * @tparam W The type of the window. + */ +@PublicEvolving +abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param context The context in which the window is being evaluated. + * @param elements The elements in the window being evaluated. + * @param out A collector for emitting elements. + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + @throws[Exception] + def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]) + + /** + * The context holding window metadata + */ + abstract class Context { + /** + * @return The window that is being evaluated. + */ + def window: W + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala new file mode 100644 index 0000000..320685a --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function + +import java.beans.Transient + +import org.apache.flink.annotation.Public +import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base abstract class for functions that are evaluated over + * keyed (grouped) windows using a context for retrieving extra information. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + * @tparam KEY The type of the key. + * @tparam W The type of the window. + */ +@Public +abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window] + extends ProcessWindowFunction[IN, OUT, KEY, W] + with RichFunction { + + @Transient + private var runtimeContext: RuntimeContext = null + + // -------------------------------------------------------------------------------------------- + // Runtime context access + // -------------------------------------------------------------------------------------------- + + override def setRuntimeContext(t: RuntimeContext) { + this.runtimeContext = t + } + + override def getRuntimeContext: RuntimeContext = { + if (this.runtimeContext != null) { + this.runtimeContext + } + else { + throw new IllegalStateException("The runtime context has not been initialized.") + } + } + + override def getIterationRuntimeContext: IterationRuntimeContext = { + if (this.runtimeContext == null) { + throw new IllegalStateException("The runtime context has not been initialized.") + } + else { + this.runtimeContext match { + case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext + case _ => + throw new IllegalStateException("This stub is not part of an iteration step function.") + } + } + } + + // -------------------------------------------------------------------------------------------- + // Default life cycle methods + // -------------------------------------------------------------------------------------------- + + @throws[Exception] + override def open(parameters: Configuration) { + } + + @throws[Exception] + override def close() { + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala new file mode 100644 index 0000000..4a20371 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function.util + +import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext} +import org.apache.flink.api.java.operators.translation.WrappingFunction +import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction} +import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +import scala.collection.JavaConverters._ + +/** + * A wrapper function that exposes a Scala ProcessWindowFunction + * as a ProcessWindowFunction function. + * + * The Scala and Java Window functions differ in their type of "Iterable": + * - Scala WindowFunction: scala.Iterable + * - Java WindowFunction: java.lang.Iterable + */ +final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( + private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W]) + extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func) + with JProcessWindowFunctionTrait[IN, OUT, KEY, W] { + + override def process( + key: KEY, + context: JProcessWindowFunction[IN, OUT, KEY, W]#Context, + elements: java.lang.Iterable[IN], + out: Collector[OUT]): Unit = { + val ctx = new func.Context { + override def window = context.window + } + func.process(key, ctx, elements.asScala, out) + } + + override def getRuntimeContext: RuntimeContext = { + throw new RuntimeException("This should never be called") + } + + override def getIterationRuntimeContext: IterationRuntimeContext = { + throw new RuntimeException("This should never be called") + } +} + +private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W] + extends JProcessWindowFunction[IN, OUT, KEY, W] http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala index 83697ce..a23145c 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala @@ -26,13 +26,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichWindowFunction, CheckingIdentityRichAllWindowFunction} +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.junit.Test +import org.junit.{Ignore, Test} import org.junit.Assert._ import scala.collection.mutable @@ -150,6 +150,66 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { } @Test + @Ignore + def testFoldWithProcessWindowFunction(): Unit = { + WindowFoldITCase.testResults = mutable.MutableList() + CheckingIdentityRichProcessWindowFunction.reset() + + val foldFunc = new FoldFunction[(String, Int), (Int, String)] { + override def fold(accumulator: (Int, String), value: (String, Int)): (Int, String) = { + (accumulator._1 + value._2, accumulator._2 + value._1) + } + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor) + + source1 + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .fold( + (0, "R:"), + foldFunc, + new CheckingIdentityRichProcessWindowFunction[(Int, String), Tuple, TimeWindow]()) + .addSink(new SinkFunction[(Int, String)]() { + def invoke(value: (Int, String)) { + WindowFoldITCase.testResults += value.toString + } + }) + + env.execute("Fold Process Window Test") + + val expectedResult = mutable.MutableList( + "(3,R:aaa)", + "(21,R:aaa)", + "(12,R:bbb)") + + assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted) + + CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls() + } + + @Test def testFoldAllWindow(): Unit = { WindowFoldITCase.testResults = mutable.MutableList() http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala index c38f422..bfbe6ee 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala @@ -25,13 +25,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction} +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.junit.Assert._ -import org.junit.Test +import org.junit.{Ignore, Test} import scala.collection.mutable @@ -87,6 +87,56 @@ class WindowFunctionITCase { } @Test + @Ignore + def testRichProcessWindowFunction(): Unit = { + WindowFunctionITCase.testResults = mutable.MutableList() + CheckingIdentityRichProcessWindowFunction.reset() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() {} + + }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor) + + source1 + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .process(new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFunctionITCase.testResults += value.toString + } + }) + + env.execute("RichProcessWindowFunction Test") + + val expectedResult = mutable.MutableList( + "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)", + "(b,3)", "(b,4)", "(b,5)") + + assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted) + + CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls() + } + + @Test def testRichAllWindowFunction(): Unit = { WindowFunctionITCase.testResults = mutable.MutableList() CheckingIdentityRichAllWindowFunction.reset() http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala index 9666266..5418108 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala @@ -26,15 +26,14 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction} +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase - import org.junit.Assert._ -import org.junit.Test +import org.junit.{Ignore, Test} import scala.collection.mutable @@ -150,6 +149,65 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase { } @Test + @Ignore + def testReduceWithProcessWindowFunction(): Unit = { + WindowReduceITCase.testResults = mutable.MutableList() + CheckingIdentityRichProcessWindowFunction.reset() + + val reduceFunc = new ReduceFunction[(String, Int)] { + override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = { + (a._1 + b._1, a._2 + b._2) + } + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor) + + source1 + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .reduce( + reduceFunc, + new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowReduceITCase.testResults += value.toString + } + }) + + env.execute("Reduce Process Window Test") + + val expectedResult = mutable.MutableList( + "(aaa,3)", + "(aaa,21)", + "(bbb,12)") + + assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted) + + CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls() + } + + @Test def testReduceAllWindow(): Unit = { WindowReduceITCase.testResults = mutable.MutableList() http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala new file mode 100644 index 0000000..d62f2d3 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.testutils + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + + +class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window] + extends RichProcessWindowFunction[T, T, K, W] { + + override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = { + for (value <- input) { + out.collect(value) + } + } + + override def open(conf: Configuration): Unit = { + super.open(conf) + CheckingIdentityRichProcessWindowFunction.openCalled = true + } + + override def close(): Unit = { + super.close() + CheckingIdentityRichProcessWindowFunction.closeCalled = true + } + + override def setRuntimeContext(context: RuntimeContext): Unit = { + super.setRuntimeContext(context) + CheckingIdentityRichProcessWindowFunction.contextSet = true + } +} + +object CheckingIdentityRichProcessWindowFunction { + + @volatile + private[CheckingIdentityRichProcessWindowFunction] var closeCalled = false + + @volatile + private[CheckingIdentityRichProcessWindowFunction] var openCalled = false + + @volatile + private[CheckingIdentityRichProcessWindowFunction] var contextSet = false + + def reset(): Unit = { + closeCalled = false + openCalled = false + contextSet = false + } + + def checkRichMethodCalls(): Unit = { + if (!contextSet) { + throw new AssertionError("context not set") + } + if (!openCalled) { + throw new AssertionError("open() not called") + } + if (!closeCalled) { + throw new AssertionError("close() not called") + } + } +}
