Repository: flink Updated Branches: refs/heads/master 09fe4b0b8 -> 5368a7d32
http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index bd3fe3d..600645b 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -24,8 +24,8 @@ import org.apache.flink.api.common.state.{AggregatingStateDescriptor, FoldingSta import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.operators.OneInputStreamOperator -import org.apache.flink.streaming.api.scala.function.WindowFunction +import org.apache.flink.streaming.api.operators.{OneInputStreamOperator, OutputTypeConfigurable} +import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor @@ -367,6 +367,90 @@ class WindowTranslationTest { } @Test + def testReduceWithProcessWindowFunctionEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .reduce( + new DummyReducer, + new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def process( + key: String, + window: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceWithProcessWindowFunctionProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) + .reduce( + new DummyReducer, + new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def process( + key: String, + window: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test def testApplyWithPreReducerEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -408,6 +492,50 @@ class WindowTranslationTest { } @Test + def testApplyWithPreReducerAndEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .evictor(CountEvictor.of(100)) + .apply( + new DummyReducer, + new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + @Test def testReduceWithWindowFunctionEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -584,6 +712,74 @@ class WindowTranslationTest { } @Test + def testAggregateWithProcessWindowFunctionEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .aggregate(new DummyAggregator(), new TestProcessWindowFunction()) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testAggregateWithProcessWindowFunctionProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) + .aggregate(new DummyAggregator(), new TestProcessWindowFunction()) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test def testAggregateWithWindowFunctionEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -814,7 +1010,7 @@ class WindowTranslationTest { } @Test - def testApplyWithPreFolderEventTime() { + def testFoldWithProcessWindowFunctionEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -823,16 +1019,15 @@ class WindowTranslationTest { val window1 = source .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) - .apply( + .fold( ("", "", 1), new DummyFolder, - new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] { - override def apply( + new ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] { + override def process( key: String, - window: TimeWindow, + window: Context, input: Iterable[(String, String, Int)], - out: Collector[(String, String, Int)]): Unit = - input foreach {x => out.collect((x._1, x._2, x._3))} + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} }) val transform = window1 @@ -858,20 +1053,24 @@ class WindowTranslationTest { } @Test - def testFoldWithWindowFunctionEventTimeWithScalaFunction() { + def testFoldWithProcessWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source .keyBy(_._1) - .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) .fold( ("", "", 1), - { (acc: (String, String, Int), _) => acc }, - { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => - in foreach { x => out.collect((x._1, x._3)) } + new DummyFolder, + new ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] { + override def process( + key: String, + window: Context, + input: Iterable[(String, String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} }) val transform = window1 @@ -885,8 +1084,8 @@ class WindowTranslationTest { val winOperator = operator .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] - assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) - assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) processElementAndEnsureOutput[String, (String, Int), (String, Int)]( @@ -896,12 +1095,8 @@ class WindowTranslationTest { ("hello", 1)) } - // -------------------------------------------------------------------------- - // apply() tests - // -------------------------------------------------------------------------- - @Test - def testApplyEventTime() { + def testApplyWithPreFolderEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -911,12 +1106,15 @@ class WindowTranslationTest { .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) .apply( - new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + ("", "", 1), + new DummyFolder, + new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] { override def apply( key: String, window: TimeWindow, - input: Iterable[(String, Int)], - out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + input: Iterable[(String, String, Int)], + out: Collector[(String, String, Int)]): Unit = + input foreach {x => out.collect((x._1, x._2, x._3))} }) val transform = window1 @@ -932,7 +1130,7 @@ class WindowTranslationTest { assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) - assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) processElementAndEnsureOutput[String, (String, Int), (String, Int)]( winOperator, @@ -942,22 +1140,26 @@ class WindowTranslationTest { } @Test - def testApplyProcessingTimeTime() { + def testApplyWithPreFolderAndEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source .keyBy(_._1) - .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) .apply( - new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + ("", "", 1), + new DummyFolder, + new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] { override def apply( - key: String, - window: TimeWindow, - input: Iterable[(String, Int)], - out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + key: String, + window: TimeWindow, + input: Iterable[(String, String, Int)], + out: Collector[(String, String, Int)]): Unit = + input foreach {x => out.collect((x._1, x._2, x._3))} }) val transform = window1 @@ -971,8 +1173,8 @@ class WindowTranslationTest { val winOperator = operator .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] - assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) - assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) processElementAndEnsureOutput[String, (String, Int), (String, Int)]( @@ -983,7 +1185,7 @@ class WindowTranslationTest { } @Test - def testApplyEventTimeWithScalaFunction() { + def testFoldWithWindowFunctionEventTimeWithScalaFunction() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -992,9 +1194,12 @@ class WindowTranslationTest { val window1 = source .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) - .apply { (key, window, in, out: Collector[(String, Int)]) => - in foreach { x => out.collect(x)} - } + .fold( + ("", "", 1), + { (acc: (String, String, Int), _) => acc }, + { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => + in foreach { x => out.collect((x._1, x._3)) } + }) val transform = window1 .javaStream @@ -1009,7 +1214,211 @@ class WindowTranslationTest { assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) - assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + // -------------------------------------------------------------------------- + // apply() tests + // -------------------------------------------------------------------------- + + @Test + def testApplyEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply( + new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply( + new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testProcessEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .process( + new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def process( + key: String, + window: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testProcessProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .process( + new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def process( + key: String, + window: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply { (key, window, in, out: Collector[(String, Int)]) => + in foreach { x => out.collect(x)} + } + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) processElementAndEnsureOutput[String, (String, Int), (String, Int)]( winOperator, @@ -1132,6 +1541,49 @@ class WindowTranslationTest { } @Test + def testProcessWithCustomTrigger() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .trigger(CountTrigger.of(1)) + .process( + new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def process( + key: String, + window: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + @Test def testReduceWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -1169,6 +1621,113 @@ class WindowTranslationTest { } @Test + def testReduceWithEvictorAndProcessFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .reduce(new DummyReducer, new TestProcessWindowFunction) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[ + EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testAggregateWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .aggregate(new DummyAggregator()) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testAggregateWithEvictorAndProcessFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .aggregate(new DummyAggregator(), new TestProcessWindowFunction) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test def testFoldWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -1210,6 +1769,48 @@ class WindowTranslationTest { } @Test + def testFoldWithEvictorAndProcessFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .fold(("", "", 1), new DummyFolder, new TestFoldProcessWindowFunction) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[ + EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + winOperator.setOutputType( + window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]], + new ExecutionConfig) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + @Test def testApplyWithEvictor() { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) @@ -1252,6 +1853,48 @@ class WindowTranslationTest { ("hello", 1)) } + @Test + def testProcessWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .process( + new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def process( + key: String, + window: Context, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } /** * Ensure that we get some output from the given operator when pushing in an element and @@ -1266,6 +1909,12 @@ class WindowTranslationTest { val testHarness = new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType) + if (operator.isInstanceOf[OutputTypeConfigurable[String]]) { + // use a dummy type since window functions just need the ExecutionConfig + // this is also only needed for Fold, which we're getting rid off soon. + operator.asInstanceOf[OutputTypeConfigurable[String]] + .setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig) + } testHarness.open() testHarness.setProcessingTime(0) @@ -1319,16 +1968,45 @@ class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String, override def add(value: (String, Int), accumulator: (String, Int)): Unit = () } -class TestWindowFunction extends WindowFunction[(String, Int), (String, Int), String, TimeWindow] { +class TestWindowFunction + extends WindowFunction[(String, Int), (String, String, Int), String, TimeWindow] { override def apply( key: String, window: TimeWindow, input: Iterable[(String, Int)], - out: Collector[(String, Int)]): Unit = { + out: Collector[(String, String, Int)]): Unit = { + + input.foreach(e => out.collect((e._1, e._1, e._2))) + } +} + +class TestProcessWindowFunction + extends ProcessWindowFunction[(String, Int), (String, String, Int), String, TimeWindow] { + + override def process( + key: String, + window: Context, + input: Iterable[(String, Int)], + out: Collector[(String, String, Int)]): Unit = { - input.foreach(out.collect) + input.foreach(e => out.collect((e._1, e._1, e._2))) } } +class TestFoldProcessWindowFunction + extends ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] { + + override def process( + key: String, + window: Context, + input: Iterable[(String, String, Int)], + out: Collector[(String, Int)]): Unit = { + + input.foreach(e => out.collect((e._1, e._3))) + } +} + + +
