[FLINK-5237] Consolidate and harmonize Window Translation Tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d3ad451
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d3ad451
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d3ad451

Branch: refs/heads/release-1.2
Commit: 8d3ad451564ce2f0a53734695e245d85ede62ae8
Parents: ac2c58c
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Nov 24 08:14:48 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Jan 11 15:47:30 2017 +0100

----------------------------------------------------------------------
 .../windowing/AllWindowTranslationTest.java     |  841 ++++++++++---
 .../windowing/TimeWindowTranslationTest.java    |  112 +-
 .../windowing/WindowTranslationTest.java        |  844 ++++++++++---
 flink-streaming-scala/pom.xml                   |    8 +
 .../api/scala/AllWindowTranslationTest.scala    | 1086 ++++++++++++++---
 .../api/scala/TimeWindowTranslationTest.scala   |  182 +++
 .../api/scala/WindowTranslationTest.scala       | 1131 +++++++++++++++---
 7 files changed, 3561 insertions(+), 643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 72b0850..3d4de5d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,121 +17,548 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.AllWindowedStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} 
instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link AllWindowedStream} 
instantiate the correct
+ * window operator.
+ *
+ * <p>We also create a test harness and push one element into the operator to 
verify
+ * that we get some output.
  */
+@SuppressWarnings("serial")
 public class AllWindowTranslationTest {
 
        /**
-        * These tests ensure that the correct trigger is set when using 
event-time windows.
+        * .reduce() does not support RichReduceFunction, since the reduce 
function is used internally
+        * in a {@code ReducingState}.
         */
-       @Test
-       @SuppressWarnings("rawtypes")
-       public void testEventTime() throws Exception {
+       @Test(expected = UnsupportedOperationException.class)
+       public void testReduceWithRichReducerFails() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               source
+                               
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .reduce(new RichReduceFunction<Tuple2<String, 
Integer>>() {
+                                       private static final long 
serialVersionUID = -6448847205314995812L;
+
+                                       @Override
+                                       public Tuple2<String, Integer> 
reduce(Tuple2<String, Integer> value1,
+                                                       Tuple2<String, Integer> 
value2) throws Exception {
+                                               return null;
+                                       }
+                               });
+
+               fail("exception was not thrown");
+       }
+
+       /**
+        * .fold() does not support RichFoldFunction, since the fold function 
is used internally
+        * in a {@code FoldingState}.
+        */
+       @Test(expected = UnsupportedOperationException.class)
+       public void testFoldWithRichFolderFails() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               source
+                               
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .fold(new Tuple2<>("", 0), new 
RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+                                       private static final long 
serialVersionUID = -6448847205314995812L;
+
+                                       @Override
+                                       public Tuple2<String, Integer> 
fold(Tuple2<String, Integer> value1,
+                                                       Tuple2<String, Integer> 
value2) throws Exception {
+                                               return null;
+                                       }
+                               });
+
+               fail("exception was not thrown");
+       }
+
+
+       @Test
+       public void testSessionWithFoldFails() throws Exception {
+               // verify that fold does not work with merging windows
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               AllWindowedStream<String, TimeWindow> windowedStream = 
env.fromElements("Hello", "Ciao")
+                               
.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+               try {
+                       windowedStream.fold("", new FoldFunction<String, 
String>() {
+                               private static final long serialVersionUID = 
-4567902917104921706L;
+
+                               @Override
+                               public String fold(String accumulator, String 
value) throws Exception {
+                                       return accumulator;
+                               }
+                       });
+               } catch (UnsupportedOperationException e) {
+                       // expected
+                       // use a catch to ensure that the exception is thrown 
by the fold
+                       return;
+               }
+
+               fail("The fold call should fail.");
+       }
+
+       @Test
+       public void testMergingAssignerWithNonMergingTriggerFails() throws 
Exception {
+               // verify that we check for trigger compatibility
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               AllWindowedStream<String, TimeWindow> windowedStream = 
env.fromElements("Hello", "Ciao")
+                               
.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+               try {
+                       windowedStream.trigger(new Trigger<String, 
TimeWindow>() {
+                               private static final long serialVersionUID = 
6558046711583024443L;
+
+                               @Override
+                               public TriggerResult onElement(String element,
+                                               long timestamp,
+                                               TimeWindow window,
+                                               TriggerContext ctx) throws 
Exception {
+                                       return null;
+                               }
+
+                               @Override
+                               public TriggerResult onProcessingTime(long time,
+                                               TimeWindow window,
+                                               TriggerContext ctx) throws 
Exception {
+                                       return null;
+                               }
+
+                               @Override
+                               public TriggerResult onEventTime(long time,
+                                               TimeWindow window,
+                                               TriggerContext ctx) throws 
Exception {
+                                       return null;
+                               }
+
+                               @Override
+                               public boolean canMerge() {
+                                       return false;
+                               }
+
+                               @Override
+                               public void clear(TimeWindow window, 
TriggerContext ctx) throws Exception {}
+                       });
+               } catch (UnsupportedOperationException e) {
+                       // expected
+                       // use a catch to ensure that the exception is thrown 
by the fold
+                       return;
+               }
+
+               fail("The trigger call should fail.");
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DummyReducer reducer = new DummyReducer();
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
                DataStream<Tuple2<String, Integer>> window1 = source
                                
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
-                               .reduce(reducer);
+                               .reduce(new DummyReducer());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceProcessingTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .reduce(new DummyReducer());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+
+       /**
+        * Ignored because we currently don't have the fast processing-time 
window operator.
+        */
+       @Test
+       @SuppressWarnings("rawtypes")
+       @Ignore
+       public void testReduceFastProcessingTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .reduce(new DummyReducer());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof 
AggregatingProcessingTimeWindowOperator);
+
+               processElementAndEnsureOutput(operator, null, 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithWindowFunctionEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
-               Assert.assertTrue(operator1 instanceof WindowOperator);
-               WindowOperator winOperator1 = (WindowOperator) operator1;
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
-               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
 
-               DataStream<Tuple2<String, Integer>> window2 = source
+               DataStream<Tuple3<String, String, Integer>> window = source
                                
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
+                               .reduce(reducer, new 
AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
                                        public void apply(
                                                        TimeWindow window,
                                                        Iterable<Tuple2<String, 
Integer>> values,
-                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
-
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
                                        }
                                });
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof WindowOperator);
-               WindowOperator winOperator2 = (WindowOperator) operator2;
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
        @Test
        @SuppressWarnings("rawtypes")
-       public void testNonEvicting() throws Exception {
+       public void testReduceWithWindowFunctionProcessingTime() throws 
Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .reduce(new DummyReducer(), new 
AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       /**
+        * Test for the deprecated .apply(Reducer, WindowFunction).
+        */
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithPreReducerEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
                DummyReducer reducer = new DummyReducer();
 
-               DataStream<Tuple2<String, Integer>> window1 = source
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .apply(reducer, new 
AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window1 = source
                                
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
-                               .trigger(CountTrigger.of(100))
-                               .reduce(reducer);
+                               .fold(new Tuple3<>("", "", 1), new 
DummyFolder());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldProcessingTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .fold(new Tuple3<>("", "", 0), new 
DummyFolder());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
-               Assert.assertTrue(operator1 instanceof WindowOperator);
-               WindowOperator winOperator1 = (WindowOperator) operator1;
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
-               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String,  Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
 
-               DataStream<Tuple2<String, Integer>> window2 = source
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithWindowFunctionEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "", 0), new 
DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithWindowFunctionProcessingTime() throws Exception 
{
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "empty", 0), new 
DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithPreFolderEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .apply(new Tuple3<>("", "", 0), new 
DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, 
Tuple3<String, String, Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f1, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
                                
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-                               .trigger(CountTrigger.of(100))
                                .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
@@ -140,24 +567,65 @@ public class AllWindowTranslationTest {
                                                        TimeWindow window,
                                                        Iterable<Tuple2<String, 
Integer>> values,
                                                        
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyProcessingTimeTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
 
+                                       @Override
+                                       public void apply(
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
                                        }
                                });
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof WindowOperator);
-               WindowOperator winOperator2 = (WindowOperator) operator2;
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+
        }
 
+
        @Test
        @SuppressWarnings("rawtypes")
-       public void testEvicting() throws Exception {
+       public void testReduceWithCustomTrigger() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
@@ -165,22 +633,56 @@ public class AllWindowTranslationTest {
 
                DataStream<Tuple2<String, Integer>> window1 = source
                                
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
-                               .evictor(CountEvictor.of(100))
+                               .trigger(CountTrigger.of(1))
                                .reduce(reducer);
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
-               Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
-               EvictingWindowOperator winOperator1 = (EvictingWindowOperator) 
operator1;
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
-               Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
-               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ListStateDescriptor);
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
 
-               DataStream<Tuple2<String, Integer>> window2 = source
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithCustomTrigger() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window1 = source
+                               
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .fold(new Tuple3<>("", "", 1), new 
DummyFolder());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithCustomTrigger() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
                                
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-                               .trigger(CountTrigger.of(100))
-                               .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
+                               .trigger(CountTrigger.of(1))
                                .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
@@ -189,136 +691,146 @@ public class AllWindowTranslationTest {
                                                        TimeWindow window,
                                                        Iterable<Tuple2<String, 
Integer>> values,
                                                        
Collector<Tuple2<String, Integer>> out) throws Exception {
-
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
                                        }
                                });
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
-               EvictingWindowOperator winOperator2 = (EvictingWindowOperator) 
operator2;
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
-               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
-       /**
-        * These tests ensure that a Fold buffer is used if possible
-        */
        @Test
        @SuppressWarnings("rawtypes")
-       public void testFoldBuffer() throws Exception {
+       public void testReduceWithEvictor() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DummyFolder folder = new DummyFolder();
+               DummyReducer reducer = new DummyReducer();
 
-               DataStream<Integer> window1 = source
+               DataStream<Tuple2<String, Integer>> window1 = source
                                
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
-                               .fold(0, folder);
+                               .evictor(CountEvictor.of(100))
+                               .reduce(reducer);
 
-               OneInputTransformation<Tuple2<String, Integer>, Integer> 
transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) 
window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Integer> 
operator1 = transform1.getOperator();
-               Assert.assertTrue(operator1 instanceof WindowOperator);
-               WindowOperator winOperator1 = (WindowOperator) operator1;
-               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
-               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
-               DataStream<Integer> window2 = source
-                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-                               .evictor(CountEvictor.of(13))
-                               .fold(0, folder);
-
-               OneInputTransformation<Tuple2<String, Integer>, Integer> 
transform2 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) 
window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Integer> 
operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof WindowOperator);
-               WindowOperator winOperator2 = (WindowOperator) operator2;
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
        @Test
-       public void testSessionWithFold() throws Exception {
-               // verify that fold does not work with merging windows
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       public void testFoldWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               StreamExecutionEnvironment env = 
LocalStreamEnvironment.createLocalEnvironment();
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-               AllWindowedStream<String, TimeWindow> windowedStream = 
env.fromElements("Hello", "Ciao")
-                               
.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+               DataStream<Tuple3<String, String, Integer>> window1 = source
+                               
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .fold(new Tuple3<>("", "", 1), new 
DummyFolder());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               winOperator.setOutputType((TypeInformation) window1.getType(), 
new ExecutionConfig());
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
 
-               try {
-                       windowedStream.fold("", new FoldFunction<String, 
String>() {
-                               private static final long serialVersionUID = 
-8722899157560218917L;
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-                               @Override
-                               public String fold(String accumulator, String 
value) throws Exception {
-                                       return accumulator;
-                               }
-                       });
-               } catch (UnsupportedOperationException e) {
-                       // expected
-                       // use a catch to ensure that the exception is thrown 
by the fold
-                       return;
-               }
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-               fail("The fold call should fail.");
-       }
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
+                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
 
-       @Test
-       public void testMergingAssignerWithNonMergingTrigger() throws Exception 
{
-               // verify that we check for trigger compatibility
+                                       @Override
+                                       public void apply(
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
 
-               StreamExecutionEnvironment env = 
LocalStreamEnvironment.createLocalEnvironment();
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
TimeEvictor);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
-               AllWindowedStream<String, TimeWindow> windowedStream = 
env.fromElements("Hello", "Ciao")
-                               
.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
 
-               try {
-                       windowedStream.trigger(new Trigger<String, 
TimeWindow>() {
-                               private static final long serialVersionUID = 
8360971631424870421L;
+       /**
+        * Ensure that we get some output from the given operator when pushing 
in an element and
+        * setting watermark and processing time to {@code Long.MAX_VALUE}.
+        */
+       private static <K, IN, OUT> void processElementAndEnsureOutput(
+                       OneInputStreamOperator<IN, OUT> operator,
+                       KeySelector<IN, K> keySelector,
+                       TypeInformation<K> keyType,
+                       IN element) throws Exception {
 
-                               @Override
-                               public TriggerResult onElement(String element,
-                                               long timestamp,
-                                               TimeWindow window,
-                                               TriggerContext ctx) throws 
Exception {
-                                       return null;
-                               }
+               KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               keySelector,
+                                               keyType);
 
-                               @Override
-                               public TriggerResult onProcessingTime(long time,
-                                               TimeWindow window,
-                                               TriggerContext ctx) throws 
Exception {
-                                       return null;
-                               }
+               testHarness.open();
 
-                               @Override
-                               public TriggerResult onEventTime(long time,
-                                               TimeWindow window,
-                                               TriggerContext ctx) throws 
Exception {
-                                       return null;
-                               }
+               testHarness.setProcessingTime(0);
+               testHarness.processWatermark(Long.MIN_VALUE);
 
-                               @Override
-                               public boolean canMerge() {
-                                       return false;
-                               }
+               testHarness.processElement(new StreamRecord<>(element, 0));
 
-                               @Override
-                               public void clear(TimeWindow window, 
TriggerContext ctx) throws Exception {}
-                       });
-               } catch (UnsupportedOperationException e) {
-                       // expected
-                       // use a catch to ensure that the exception is thrown 
by the fold
-                       return;
-               }
+               // provoke any processing-time/event-time triggers
+               testHarness.setProcessingTime(Long.MAX_VALUE);
+               testHarness.processWatermark(Long.MAX_VALUE);
 
-               fail("The trigger call should fail.");
+               // we at least get the two watermarks and should also see an 
output element
+               assertTrue(testHarness.getOutput().size() >= 3);
+
+               testHarness.close();
        }
 
        // 
------------------------------------------------------------------------
@@ -334,13 +846,12 @@ public class AllWindowTranslationTest {
                }
        }
 
-       public static class DummyFolder implements FoldFunction<Tuple2<String, 
Integer>, Integer> {
-               private static final long serialVersionUID = 1L;
-
+       private static class DummyFolder implements FoldFunction<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>> {
                @Override
-               public Integer fold(Integer accumulator, Tuple2<String, 
Integer> value) throws Exception {
+               public Tuple3<String, String, Integer> fold(
+                               Tuple3<String, String, Integer> accumulator,
+                               Tuple2<String, Integer> value) throws Exception 
{
                        return accumulator;
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 5aa8151..8e37021 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -44,9 +46,8 @@ import org.junit.Test;
 import java.util.concurrent.TimeUnit;
 
 /**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link WindowedStream} that use 
the "time" shortcut
+ * instantiate the correct window operator.
  */
 public class TimeWindowTranslationTest {
 
@@ -56,8 +57,9 @@ public class TimeWindowTranslationTest {
         */
        @Test
        @Ignore
-       public void testFastTimeWindows() throws Exception {
+       public void testReduceFastTimeWindows() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
@@ -71,8 +73,21 @@ public class TimeWindowTranslationTest {
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
                Assert.assertTrue(operator1 instanceof 
AggregatingProcessingTimeWindowOperator);
+       }
 
-               DataStream<Tuple2<String, Integer>> window2 = source
+       /**
+        * These tests ensure that the fast aligned time windows operator is 
used if the
+        * conditions are right.
+        */
+       @Test
+       @Ignore
+       public void testApplyFastTimeWindows() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(0)
                                .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS))
                                .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -87,25 +102,25 @@ public class TimeWindowTranslationTest {
                                        }
                                });
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof 
AccumulatingProcessingTimeWindowOperator);
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof 
AccumulatingProcessingTimeWindowOperator);
        }
 
        @Test
        @SuppressWarnings("rawtypes")
-       public void testEventTimeWindows() throws Exception {
+       public void testReduceEventTimeWindows() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-               DummyReducer reducer = new DummyReducer();
+               DataStream<Tuple2<String, Integer>> source = env.fromElements(
+                               Tuple2.of("hello", 1),
+                               Tuple2.of("hello", 2));
 
                DataStream<Tuple2<String, Integer>> window1 = source
-                       .keyBy(0)
-                       .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), 
Time.of(100, TimeUnit.MILLISECONDS))
-                       .reduce(reducer);
+                               .keyBy(0)
+                               .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+                               .reduce(new DummyReducer());
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
@@ -114,8 +129,43 @@ public class TimeWindowTranslationTest {
                Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
                Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+       }
 
-               DataStream<Tuple2<String, Integer>> window2 = source
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldEventTimeWindows() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = env.fromElements(
+                               Tuple2.of("hello", 1),
+                               Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+                               .fold(new Tuple2<>("", 1), new DummyFolder());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof WindowOperator);
+               WindowOperator winOperator1 = (WindowOperator) operator1;
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyEventTimeWindows() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = env.fromElements(
+                               Tuple2.of("hello", 1),
+                               Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
                        .keyBy(0)
                        .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
                        .apply(new WindowFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -130,14 +180,13 @@ public class TimeWindowTranslationTest {
                                }
                        });
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof WindowOperator);
-               WindowOperator winOperator2 = (WindowOperator) operator2;
-               Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
-
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof WindowOperator);
+               WindowOperator winOperator1 = (WindowOperator) operator1;
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ListStateDescriptor);
        }
 
        /**
@@ -187,7 +236,7 @@ public class TimeWindowTranslationTest {
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
+       private static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -195,4 +244,15 @@ public class TimeWindowTranslationTest {
                        return value1;
                }
        }
+
+       private static class DummyFolder
+                       implements FoldFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<String, Integer> fold(Tuple2<String, Integer> 
value1, Tuple2<String, Integer> value2) throws Exception {
+                       return value1;
+               }
+       }
+
 }

Reply via email to