Repository: flink Updated Branches: refs/heads/release-1.2 6aa38ee22 -> 8d3ad4515
http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 299932f..7235b22 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -18,234 +18,1067 @@ package org.apache.flink.streaming.api.scala -import java.util.concurrent.TimeUnit - -import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor} -import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction, RichFoldFunction, RichReduceFunction} +import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, SlidingProcessingTimeWindows, TumblingEventTimeWindows} -import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor import org.apache.flink.streaming.api.windowing.time.Time -import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, ProcessingTimeTrigger} -import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, EvictingWindowOperator, WindowOperator} -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, EventTimeTrigger, ProcessingTimeTrigger, Trigger} +import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} +import org.apache.flink.streaming.runtime.operators.windowing._ +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness import org.apache.flink.util.Collector import org.junit.Assert._ -import org.junit.{Ignore, Test} +import org.junit.Test -class WindowTranslationTest extends StreamingMultipleProgramsTestBase { +/** + * These tests verify that the api calls on [[WindowedStream]] instantiate the correct + * window operator. + * + * We also create a test harness and push one element into the operator to verify + * that we get some output. + */ +class WindowTranslationTest { /** - * These tests ensure that the fast aligned time windows operator is used if the - * conditions are right. - */ - @Test - @Ignore - def testFastTimeWindows(): Unit = { + * .reduce() does not support [[RichReduceFunction]], since the reduce function is used + * internally in a [[org.apache.flink.api.common.state.ReducingState]]. + */ + @Test(expected = classOf[UnsupportedOperationException]) + def testReduceWithRichReducerFails() { val env = StreamExecutionEnvironment.getExecutionEnvironment - val source = env.fromElements(("hello", 1), ("hello", 2)) - val reducer = new DummyReducer + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - val window1 = source + source .keyBy(0) - .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)) - .reduce(reducer) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .reduce(new RichReduceFunction[(String, Int)] { + override def reduce(value1: (String, Int), value2: (String, Int)) = null + }) - val transform1 = window1.javaStream.getTransformation - .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - - val operator1 = transform1.getOperator + fail("exception was not thrown") + } - assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]]) + /** + * .fold() does not support [[RichFoldFunction]], since the reduce function is used internally + * in a [[org.apache.flink.api.common.state.FoldingState]]. + */ + @Test(expected = classOf[UnsupportedOperationException]) + def testFoldWithRichFolderFails() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val source = env.fromElements(("hello", 1), ("hello", 2)) - val window2 = source + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + source .keyBy(0) - .timeWindow(Time.minutes(1)) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - key: Tuple, + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold(("", 0), new RichFoldFunction[(String, Int), (String, Int)] { + override def fold(accumulator: (String, Int), value: (String, Int)) = null + }) + + fail("exception was not thrown") + } + + @Test + def testSessionWithFoldFails() { + // verify that fold does not work with merging windows + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val windowedStream = env.fromElements("Hello", "Ciao") + .keyBy(x => x) + .window(EventTimeSessionWindows.withGap(Time.seconds(5))) + + try + windowedStream.fold("", new FoldFunction[String, String]() { + @throws[Exception] + def fold(accumulator: String, value: String): String = accumulator + }) + + catch { + case _: UnsupportedOperationException => + // expected + // use a catch to ensure that the exception is thrown by the fold + return + } + + fail("The fold call should fail.") + } + + @Test + def testMergingAssignerWithNonMergingTriggerFails() { + // verify that we check for trigger compatibility + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val windowedStream = env.fromElements("Hello", "Ciao") + .keyBy(x => x) + .window(EventTimeSessionWindows.withGap(Time.seconds(5))) + + try + windowedStream.trigger(new Trigger[String, TimeWindow]() { + def onElement( + element: String, + timestamp: Long, window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } + ctx: Trigger.TriggerContext) = null + + def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null + + def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null + + override def canMerge = false + + def clear(window: TimeWindow, ctx: Trigger.TriggerContext) {} }) - val transform2 = window2.javaStream.getTransformation + catch { + case _: UnsupportedOperationException => + // expected + // use a catch to ensure that the exception is thrown by the fold + return + } + + fail("The trigger call should fail.") + } + + @Test + def testReduceEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .reduce(new DummyReducer) + + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator2 = transform2.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) - assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]]) + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) } @Test - def testNonEvicting(): Unit = { + def testReduceProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) - val reducer = new DummyReducer + val window1 = source + .keyBy(_._1) + .window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .reduce(new DummyReducer) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source - .keyBy(0) - .window(SlidingEventTimeWindows.of( - Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) - .trigger(CountTrigger.of(100)) - .reduce(reducer) + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .reduce( (x, _) => x ) - val transform1 = window1.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator1 = transform1.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) - assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] - assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) - assertTrue( - winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) - val window2 = source - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - tuple: Tuple, - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceWithWindowFunctionEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) - val transform2 = window2.javaStream.getTransformation + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .reduce( + new DummyReducer, new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) + + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator2 = transform2.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) - assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]] - assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) - assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) } @Test - def testEvicting(): Unit = { + def testReduceWithWindowFunctionProcessingTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val source = env.fromElements(("hello", 1), ("hello", 2)) - val reducer = new DummyReducer + val window1 = source + .keyBy(_._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) + .reduce( + new DummyReducer, new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyWithPreReducerEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source - .keyBy(0) - .window(SlidingProcessingTimeWindows.of( - Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) - .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) - .reduce(reducer) + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply( + new DummyReducer, new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) + }) - val transform1 = window1.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator1 = transform1.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) - assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]] - assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger]) - assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) - assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) - assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) - val window2 = source - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .evictor(CountEvictor.of(1000)) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - tuple: Tuple, - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceWithWindowFunctionEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) - val transform2 = window2.javaStream.getTransformation + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .reduce( + { (x, _) => x }, + { (_, _, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} }) + + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator2 = transform2.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] - assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]] - assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) - assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) } + @Test - def testPreReduce(): Unit = { + def testFoldEventTime() { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val source = env.fromElements(("hello", 1), ("hello", 2)) - val reducer = new DummyReducer + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold(("", "", 1), new DummyFolder) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) val window1 = source - .keyBy(0) - .window(SlidingEventTimeWindows.of( - Time.of(1, TimeUnit.SECONDS), - Time.of(100, TimeUnit.MILLISECONDS))) - .trigger(CountTrigger.of(100)) - .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - tuple: Tuple, - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + .keyBy(_._1) + .window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold(("", "", 1), new DummyFolder) - val transform1 = window1.javaStream.getTransformation + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator1 = transform1.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) - assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] - assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) - assertTrue( - winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) - val window2 = source - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .trigger(CountTrigger.of(100)) - .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - tuple: Tuple, - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } - val transform2 = window2.javaStream.getTransformation + @Test + def testFoldEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold(("", "", 1)) { (acc, _) => acc } + + val transform = window1 + .javaStream + .getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - val operator2 = transform2.getOperator + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] - assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]] - assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) - assertTrue( - winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + @Test + def testFoldWithWindowFunctionEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold( + ("", "", 1), + new DummyFolder, + new WindowFunction[(String, String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldWithWindowFunctionProcessingTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold( + ("", "", 1), + new DummyFolder, + new WindowFunction[(String, String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyWithPreFolderEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply( + ("", "", 1), + new DummyFolder, + new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, String, Int)], + out: Collector[(String, String, Int)]): Unit = + input foreach {x => out.collect((x._1, x._2, x._3))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldWithWindowFunctionEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .fold( + ("", "", 1), + { (acc: (String, String, Int), _) => acc }, + { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => + in foreach { x => out.collect((x._1, x._3)) } + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + @Test + def testApplyEventTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply( + new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyProcessingTimeTime() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply( + new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyEventTimeWithScalaFunction() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply { (key, window, in, out: Collector[(String, Int)]) => + in foreach { x => out.collect(x)} + } + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + @Test + def testReduceWithCustomTrigger() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .trigger(CountTrigger.of(1)) + .reduce(new DummyReducer) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldWithCustomTrigger() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .trigger(CountTrigger.of(1)) + .fold(("", "", 1), new DummyFolder) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyWithCustomTrigger() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .trigger(CountTrigger.of(1)) + .apply( + new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testReduceWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .reduce(new DummyReducer) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[ + EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testFoldWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .fold(("", "", 1), new DummyFolder) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[ + EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + winOperator.setOutputType( + window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]], + new ExecutionConfig) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + @Test + def testApplyWithEvictor() { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val window1 = source + .keyBy(_._1) + .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .evictor(CountEvictor.of(100)) + .apply( + new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { + override def apply( + key: String, + window: TimeWindow, + input: Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))} + }) + + val transform = window1 + .javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator = transform.getOperator + assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]]) + + val winOperator = operator + .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]] + + assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger]) + assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]]) + assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) + assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) + + processElementAndEnsureOutput[String, (String, Int), (String, Int)]( + winOperator, + winOperator.getKeySelector, + BasicTypeInfo.STRING_TYPE_INFO, + ("hello", 1)) + } + + + /** + * Ensure that we get some output from the given operator when pushing in an element and + * setting watermark and processing time to `Long.MaxValue`. + */ + @throws[Exception] + private def processElementAndEnsureOutput[K, IN, OUT]( + operator: OneInputStreamOperator[IN, OUT], + keySelector: KeySelector[IN, K], + keyType: TypeInformation[K], + element: IN) { + val testHarness = + new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType) + + testHarness.open() + + testHarness.setProcessingTime(0) + testHarness.processWatermark(Long.MinValue) + + testHarness.processElement(new StreamRecord[IN](element, 0)) + + // provoke any processing-time/event-time triggers + testHarness.setProcessingTime(Long.MaxValue) + testHarness.processWatermark(Long.MaxValue) + + // we at least get the two watermarks and should also see an output element + assertTrue(testHarness.getOutput.size >= 3) + + testHarness.close() + } +} + +class DummyReducer extends ReduceFunction[(String, Int)] { + override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { + value1 } } + +class DummyFolder extends FoldFunction[(String, Int), (String, String, Int)] { + override def fold(acc: (String, String, Int), in: (String, Int)): (String, String, Int) = { + acc + } +} +
