http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index 6273e54..c738955 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -19,247 +19,1017 @@ package org.apache.flink.streaming.api.scala -import java.util.concurrent.TimeUnit - -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor} -import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.functions.{FoldFunction, RichFoldFunction, RichReduceFunction} +import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction} +import org.apache.flink.streaming.api.operators.OneInputStreamOperator +import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows} -import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor import org.apache.flink.streaming.api.windowing.time.Time -import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} -import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, EventTimeTrigger, ProcessingTimeTrigger, Trigger} +import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} import org.apache.flink.streaming.runtime.operators.windowing._ -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness import org.apache.flink.util.Collector - import org.junit.Assert._ -import org.junit.{Ignore, Test} +import org.junit.Test + +/** + * These tests verify that the api calls on [[AllWindowedStream]] instantiate the correct + * window operator. + * + * We also create a test harness and push one element into the operator to verify + * that we get some output. + */ +class AllWindowTranslationTest { + + /** + * .reduce() does not support [[RichReduceFunction]], since the reduce function is used + * internally in a [[org.apache.flink.api.common.state.ReducingState]]. + */ + @Test(expected = classOf[UnsupportedOperationException]) + def testReduceWithRichReducerFails() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val source = env.fromElements(("hello", 1), ("hello", 2)) + + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .reduce(new RichReduceFunction[(String, Int)] { + override def reduce(value1: (String, Int), value2: (String, Int)) = null + }) -class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { + fail("exception was not thrown") + } /** - * These tests ensure that the fast aligned time windows operator is used if the - * conditions are right. - * - * TODO: update once we have optimized aligned time windows operator for all-windows - */ - @Ignore + * .fold() does not support [[RichFoldFunction]], since the reduce function is used internally + * in a [[org.apache.flink.api.common.state.FoldingState]]. + */ + @Test(expected = classOf[UnsupportedOperationException]) + def testFoldWithRichFolderFails() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val source = env.fromElements(("hello", 1), ("hello", 2)) + + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold(("", 0), new RichFoldFunction[(String, Int), (String, Int)] { + override def fold(accumulator: (String, Int), value: (String, Int)) = null + }) + + fail("exception was not thrown") + } + + @Test + def testSessionWithFoldFails() { + // verify that fold does not work with merging windows + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val windowedStream = env.fromElements("Hello", "Ciao") + .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))) + + try + windowedStream.fold("", new FoldFunction[String, String]() { + @throws[Exception] + def fold(accumulator: String, value: String): String = accumulator + }) + + catch { + case _: UnsupportedOperationException => + // expected + // use a catch to ensure that the exception is thrown by the fold + return + } + + fail("The fold call should fail.") + } + + @Test + def testMergingAssignerWithNonMergingTriggerFails() { + // verify that we check for trigger compatibility + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val windowedStream = env.fromElements("Hello", "Ciao") + .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))) + + try + windowedStream.trigger(new Trigger[String, TimeWindow]() { + def onElement( + element: String, + timestamp: Long, + window: TimeWindow, + ctx: Trigger.TriggerContext) = null + + def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null + + def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null + + override def canMerge = false + + def clear(window: TimeWindow, ctx: Trigger.TriggerContext) {} + }) + + catch { + case _: UnsupportedOperationException => + // expected + // use a catch to ensure that the exception is thrown by the fold + return + } + + fail("The trigger call should fail.") + } + + @Test + def testReduceEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .reduce(new DummyReducer) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + @Test - def testFastTimeWindows(): Unit = { + def testReduceProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) - val reducer = new DummyReducer + val window1 = source + .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .reduce(new DummyReducer) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source - .windowAll(SlidingEventTimeWindows.of( - Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) - .reduce(reducer) + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .reduce( (x, _) => x ) - val transform1 = window1.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator1 = transform1.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) - assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]]) + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] - val window2 = source - .keyBy(0) - .windowAll(SlidingEventTimeWindows.of( - Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { - def apply( - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceWithWindowFunctionEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) - val transform2 = window2.javaStream.getTransformation + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) + .reduce( + new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) + + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator2 = transform2.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] - assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]]) + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) } @Test - def testNonEvicting(): Unit = { + def testReduceWithWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) - val reducer = new DummyReducer + val window1 = source + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1))) + .reduce( + new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyWithPreReducerEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source - .windowAll(SlidingEventTimeWindows.of( - Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) - .trigger(CountTrigger.of(100)) - .reduce(reducer) + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply( + new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) - val transform1 = window1.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator1 = transform1.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) - assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] - assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) - assertTrue( - winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) - val window2 = source - .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { - def apply( - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } - val transform2 = window2.javaStream.getTransformation + @Test + def testReduceWithWindowFunctionEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) + .reduce( + { (x, _) => x }, + { (_, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} }) + + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator2 = transform2.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) - assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]] - assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) - assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) } + @Test - def testEvicting(): Unit = { + def testFoldEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) - val reducer = new DummyReducer + val window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold(("", "", 1), new DummyFolder) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source - .windowAll(SlidingProcessingTimeWindows.of( - Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) - .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) - .reduce(reducer) + .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold(("", "", 1), new DummyFolder) - val transform1 = window1.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator1 = transform1.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] - assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]] - assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger]) - assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) - assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) - assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + val source = env.fromElements(("hello", 1), ("hello", 2)) - val window2 = source - .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .evictor(CountEvictor.of(1000)) - .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { - def apply( - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + val window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold(("", "", 1)) { (acc, _) => acc } - val transform2 = window2.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator2 = transform2.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] - assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]] - assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) - assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) } + @Test - def testPreReduce(): Unit = { + def testFoldWithWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) - val reducer = new DummyReducer + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold( + ("", "", 1), + new DummyFolder, + new AllWindowFunction[(String, String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldWithWindowFunctionProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source - .keyBy(0) - .window(SlidingEventTimeWindows.of( - Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) - .trigger(CountTrigger.of(100)) - .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - tuple: Tuple, - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold( + ("", "", 1), + new DummyFolder, + new AllWindowFunction[(String, String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} + }) - val transform1 = window1.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator1 = transform1.getOperator - - assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] - assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) - assertTrue( - winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) - - - val window2 = source - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - tuple: Tuple, - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyWithPreFolderEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply( + ("", "", 1), + new DummyFolder, + new AllWindowFunction[(String, String, Int), (String, String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, String, Int)], + out: Collector[(String, String, Int)]): Unit = + input foreach {x => out.collect((x._1, x._2, x._3))} + }) - val transform2 = window2.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator2 = transform2.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) - assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]] - assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) - assertTrue( - winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) } -} + @Test + def testFoldWithWindowFunctionEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold( + ("", "", 1), + { (acc: (String, String, Int), _) => acc }, + { (_, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => + in foreach { x => out.collect((x._1, x._3)) } + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + @Test + def testApplyEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply( + new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyProcessingTimeTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply( + new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply { (window, in, out: Collector[(String, Int)]) => + in foreach { x => out.collect(x)} + } + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + @Test + def testReduceWithCustomTrigger() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .trigger(CountTrigger.of(1)) + .reduce(new DummyReducer) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldWithCustomTrigger() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .trigger(CountTrigger.of(1)) + .fold(("", "", 1), new DummyFolder) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyWithCustomTrigger() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .trigger(CountTrigger.of(1)) + .apply( + new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .reduce(new DummyReducer) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[ + EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .fold(("", "", 1), new DummyFolder) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[ + EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + winOperator.setOutputType( + window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]], + new ExecutionConfig) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .apply( + new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def apply( + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + /** + * Ensure that we get some output from the given operator when pushing in an element and + * setting watermark and processing time to `Long.MaxValue`. + */ + @throws[Exception] + private def processElementAndEnsureOutput[K, IN, OUT]( + operator: OneInputStreamOperator[IN, OUT], + keySelector: KeySelector[IN, K], + keyType: TypeInformation[K], + element: IN) { + val testHarness = + new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType) + + testHarness.open() + + testHarness.setProcessingTime(0) + testHarness.processWatermark(Long.MinValue) + + testHarness.processElement(new StreamRecord[IN](element, 0)) + + // provoke any processing-time/event-time triggers + testHarness.setProcessingTime(Long.MaxValue) + testHarness.processWatermark(Long.MaxValue) -// ------------------------------------------------------------------------ -// UDFs -// ------------------------------------------------------------------------ + // we at least get the two watermarks and should also see an output element + assertTrue(testHarness.getOutput.size >= 3) -class DummyReducer extends ReduceFunction[(String, Int)] { - def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { - value1 + testHarness.close() } }
http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala new file mode 100644 index 0000000..ff97656 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala.function.WindowFunction +import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, WindowOperator} +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.util.Collector +import org.junit.Assert._ +import org.junit.{Ignore, Test} + +/** + * These tests verify that the api calls on [[WindowedStream]] that use the "time" shortcut + * instantiate the correct window operator. + */ +class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { + + /** + * These tests ensure that the fast aligned time windows operator is used if the + * conditions are right. + */ + @Test + @Ignore + def testReduceFastTimeWindows(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(0) + .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)) + .reduce(new DummyReducer()) + + val transform1 = window1.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]]) + } + + /** + * These tests ensure that the fast aligned time windows operator is used if the + * conditions are right. + */ + @Test + @Ignore + def testApplyFastTimeWindows(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(0) + .timeWindow(Time.minutes(1)) + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + key: Tuple, + window: TimeWindow, + values: Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform1 = window1.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]]) + } + + @Test + def testReduceEventTimeWindows(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(0) + .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)) + .reduce(new DummyReducer()) + + val transform1 = window1.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) + + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] + + assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + } + + @Test + def testFoldEventTimeWindows(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(0) + .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)) + .fold(("", "", 1), new DummyFolder()) + + val transform1 = window1.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) + + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] + + assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator1.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + } + + @Test + def testApplyEventTimeWindows(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(0) + .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)) + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow] { + override def apply( + key: Tuple, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = ??? + }) + + val transform1 = window1.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) + + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] + + assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + } +}
