http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 66de849..492d275 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,46 +17,59 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link WindowedStream} instantiate 
the correct
+ * window operator.
+ *
+ * <p>We also create a test harness and push one element into the operator to 
verify
+ * that we get some output.
  */
 @SuppressWarnings("serial")
 public class WindowTranslationTest {
@@ -66,13 +79,13 @@ public class WindowTranslationTest {
         * in a {@code ReducingState}.
         */
        @Test(expected = UnsupportedOperationException.class)
-       public void testReduceFailWithRichReducer() throws Exception {
+       public void testReduceWithRichReducerFails() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               DataStream<Tuple2<String, Integer>> window1 = source
+               source
                        .keyBy(0)
                        .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                        .reduce(new RichReduceFunction<Tuple2<String, 
Integer>>() {
@@ -84,163 +97,43 @@ public class WindowTranslationTest {
                                        return null;
                                }
                        });
+
+               fail("exception was not thrown");
        }
 
        /**
-        * These tests ensure that the correct trigger is set when using 
event-time windows.
+        * .fold() does not support RichFoldFunction, since the fold function 
is used internally
+        * in a {@code FoldingState}.
         */
-       @Test
-       @SuppressWarnings("rawtypes")
-       public void testEventTime() throws Exception {
+       @Test(expected = UnsupportedOperationException.class)
+       public void testFoldWithRichFolderFails() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-               DummyReducer reducer = new DummyReducer();
-
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(0)
-                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-                               .reduce(reducer);
-
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
-               Assert.assertTrue(operator1 instanceof WindowOperator);
-               WindowOperator winOperator1 = (WindowOperator) operator1;
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
-               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ReducingStateDescriptor);
-
-               DataStream<Tuple2<String, Integer>> window2 = source
-                               .keyBy(0)
-                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-                                       private static final long 
serialVersionUID = 1L;
-
-                                       @Override
-                                       public void apply(Tuple tuple,
-                                                       TimeWindow window,
-                                                       Iterable<Tuple2<String, 
Integer>> values,
-                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
-                                       }
-                               });
-
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof WindowOperator);
-               WindowOperator winOperator2 = (WindowOperator) operator2;
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
-       }
-
-       @Test
-       @SuppressWarnings("rawtypes")
-       public void testNonEvicting() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-               DummyReducer reducer = new DummyReducer();
-
-               DataStream<Tuple2<String, Integer>> window1 = source
+               source
                                .keyBy(0)
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-                               .trigger(CountTrigger.of(100))
-                               .reduce(reducer);
-
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
-               Assert.assertTrue(operator1 instanceof WindowOperator);
-               WindowOperator winOperator1 = (WindowOperator) operator1;
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
-               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ReducingStateDescriptor);
-
-               DataStream<Tuple2<String, Integer>> window2 = source
-                               .keyBy(0)
-                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .trigger(CountTrigger.of(100))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-                                       private static final long 
serialVersionUID = 1L;
+                               .fold(new Tuple2<>("", 0), new 
RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+                                       private static final long 
serialVersionUID = -6448847205314995812L;
 
                                        @Override
-                                       public void apply(Tuple tuple,
-                                                       TimeWindow window,
-                                                       Iterable<Tuple2<String, 
Integer>> values,
-                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
-
+                                       public Tuple2<String, Integer> 
fold(Tuple2<String, Integer> value1,
+                                                       Tuple2<String, Integer> 
value2) throws Exception {
+                                               return null;
                                        }
                                });
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof WindowOperator);
-               WindowOperator winOperator2 = (WindowOperator) operator2;
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
+               fail("exception was not thrown");
        }
 
-       @Test
-       @SuppressWarnings("rawtypes")
-       public void testEvicting() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-               DummyReducer reducer = new DummyReducer();
-
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(0)
-                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-                               .evictor(CountEvictor.of(100))
-                               .reduce(reducer);
-
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
-               Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
-               EvictingWindowOperator winOperator1 = (EvictingWindowOperator) 
operator1;
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
-               Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
-               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ListStateDescriptor);
-
-               DataStream<Tuple2<String, Integer>> window2 = source
-                               .keyBy(0)
-                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .trigger(CountTrigger.of(100))
-                               .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-                                       private static final long 
serialVersionUID = 1L;
-
-                                       @Override
-                                       public void apply(Tuple tuple,
-                                                       TimeWindow window,
-                                                       Iterable<Tuple2<String, 
Integer>> values,
-                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
-
-                                       }
-                               });
-
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
-               EvictingWindowOperator winOperator2 = (EvictingWindowOperator) 
operator2;
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
-               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
-       }
 
        @Test
-       public void testSessionWithFold() throws Exception {
+       public void testSessionWithFoldFails() throws Exception {
                // verify that fold does not work with merging windows
 
-               StreamExecutionEnvironment env = 
LocalStreamEnvironment.createLocalEnvironment();
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                WindowedStream<String, String, TimeWindow> windowedStream = 
env.fromElements("Hello", "Ciao")
                                .keyBy(new KeySelector<String, String>() {
@@ -272,10 +165,10 @@ public class WindowTranslationTest {
        }
 
        @Test
-       public void testMergingAssignerWithNonMergingTrigger() throws Exception 
{
+       public void testMergingAssignerWithNonMergingTriggerFails() throws 
Exception {
                // verify that we check for trigger compatibility
 
-               StreamExecutionEnvironment env = 
LocalStreamEnvironment.createLocalEnvironment();
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                WindowedStream<String, String, TimeWindow> windowedStream = 
env.fromElements("Hello", "Ciao")
                                .keyBy(new KeySelector<String, String>() {
@@ -331,7 +224,649 @@ public class WindowTranslationTest {
                fail("The trigger call should fail.");
        }
 
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .reduce(new DummyReducer());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceProcessingTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .reduce(new DummyReducer());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+
+       /**
+        * Ignored because we currently don't have the fast processing-time 
window operator.
+        */
+       @Test
+       @SuppressWarnings("rawtypes")
+       @Ignore
+       public void testReduceFastProcessingTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .reduce(new DummyReducer());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof 
AggregatingProcessingTimeWindowOperator);
+
+               processElementAndEnsureOutput(operator, null, 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithWindowFunctionEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .reduce(reducer, new 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithWindowFunctionProcessingTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .reduce(new DummyReducer(), new 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String tuple,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       /**
+        * Test for the deprecated .apply(Reducer, WindowFunction).
+        */
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithPreReducerEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .apply(reducer, new 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .fold(new Tuple3<>("", "", 1), new 
DummyFolder());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldProcessingTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .fold(new Tuple3<>("", "", 0), new 
DummyFolder());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String,  Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithWindowFunctionEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "", 0), new 
DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithWindowFunctionProcessingTime() throws Exception 
{
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "empty", 0), new 
DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithPreFolderEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .apply(new Tuple3<>("", "", 0), new 
DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in. f1, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, 
Tuple3<String,String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyProcessingTimeTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithCustomTrigger() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .reduce(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithCustomTrigger() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .fold(new Tuple3<>("", "", 1), new 
DummyFolder());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithCustomTrigger() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .reduce(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       public void testFoldWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .fold(new Tuple3<>("", "", 1), new 
DummyFolder());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               winOperator.setOutputType((TypeInformation) window1.getType(), 
new ExecutionConfig());
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
TimeEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       /**
+        * Ensure that we get some output from the given operator when pushing 
in an element and
+        * setting watermark and processing time to {@code Long.MAX_VALUE}.
+        */
+       private static <K, IN, OUT> void processElementAndEnsureOutput(
+                       OneInputStreamOperator<IN, OUT> operator,
+                       KeySelector<IN, K> keySelector,
+                       TypeInformation<K> keyType,
+                       IN element) throws Exception {
+
+               KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               keySelector,
+                                               keyType);
+
+               testHarness.open();
+
+               testHarness.setProcessingTime(0);
+               testHarness.processWatermark(Long.MIN_VALUE);
+
+               testHarness.processElement(new StreamRecord<>(element, 0));
+
+               // provoke any processing-time/event-time triggers
+               testHarness.setProcessingTime(Long.MAX_VALUE);
+               testHarness.processWatermark(Long.MAX_VALUE);
 
+               // we at least get the two watermarks and should also see an 
output element
+               assertTrue(testHarness.getOutput().size() >= 3);
+
+               testHarness.close();
+       }
 
        // 
------------------------------------------------------------------------
        //  UDFs
@@ -345,4 +880,23 @@ public class WindowTranslationTest {
                        return value1;
                }
        }
+
+       private static class DummyFolder implements FoldFunction<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>> {
+               @Override
+               public Tuple3<String, String, Integer> fold(
+                               Tuple3<String, String, Integer> accumulator,
+                               Tuple2<String, Integer> value) throws Exception 
{
+                       return accumulator;
+               }
+       }
+
+       private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String getKey(Tuple2<String, Integer> value) throws 
Exception {
+                       return value.f0;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index b0cc961..ddc3e05 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -106,6 +106,14 @@ under the License.
                        <type>test-jar</type>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
        </dependencies>
 
        <build>

Reply via email to