http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index b94e530..9d4a41a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -18,20 +18,22 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
@@ -42,57 +44,25 @@ import 
org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-@RunWith(Parameterized.class)
 public class WindowOperatorTest {
 
-       @SuppressWarnings("unchecked,rawtypes")
-       private WindowBufferFactory windowBufferFactory;
-
-       public WindowOperatorTest(WindowBufferFactory<?, ?> 
windowBufferFactory) {
-               this.windowBufferFactory = windowBufferFactory;
-       }
-
        // For counting if close() is called the correct number of times on the 
SumReducer
        private static AtomicInteger closeCalled = new AtomicInteger(0);
 
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testSlidingEventTimeWindows() throws Exception {
-               closeCalled.set(0);
-
-               final int WINDOW_SIZE = 3;
-               final int WINDOW_SLIDE = 1;
-
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, TimeWindow> operator = new WindowOperator<>(
-                               SlidingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               windowBufferFactory,
-                               new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
-                               EventTimeTrigger.create());
-
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(operator);
+       private void 
testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, 
Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
 
                long initialTime = 0L;
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
-               testHarness.open();
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                // add elements out-of-order
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3999));
@@ -148,37 +118,84 @@ public class WindowOperatorTest {
                expectedOutput.add(new Watermark(7999));
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-               testHarness.close();
-               if (windowBufferFactory instanceof 
PreAggregatingHeapWindowBuffer.Factory) {
-                       Assert.assertEquals("Close was not called.", 2, 
closeCalled.get());
-               } else {
-                       Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-               }
        }
 
        @Test
        @SuppressWarnings("unchecked")
-       public void testTumblingEventTimeWindows() throws Exception {
+       public void testSlidingEventTimeWindowsReduce() throws Exception {
                closeCalled.set(0);
 
                final int WINDOW_SIZE = 3;
+               final int WINDOW_SLIDE = 1;
 
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, TimeWindow> operator = new WindowOperator<>(
-                               TumblingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
+                       new SumReducer(),
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+                               SlidingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               windowBufferFactory,
-                               new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
+                               stateDesc,
+                               new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(),
                                EventTimeTrigger.create());
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
+               operator.setInputType(inputType, new ExecutionConfig());
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
OneInputStreamOperatorTestHarness<>(operator);
 
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.open();
+
+               testSlidingEventTimeWindows(testHarness);
+
+               testHarness.close();
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testSlidingEventTimeWindowsApply() throws Exception {
+               closeCalled.set(0);
+
+               final int WINDOW_SIZE = 3;
+               final int WINDOW_SLIDE = 1;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
+                       SlidingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                       new TimeWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new RichSumReducer<TimeWindow>(),
+                       EventTimeTrigger.create());
+
+               operator.setInputType(inputType, new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.open();
+
+               testSlidingEventTimeWindows(testHarness);
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+       }
+
+       private void 
testTumblingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, 
Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
                long initialTime = 0L;
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -233,13 +250,79 @@ public class WindowOperatorTest {
                expectedOutput.add(new Watermark(7999));
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+       }
+
+               @Test
+       @SuppressWarnings("unchecked")
+       public void testTumblingEventTimeWindowsReduce() throws Exception {
+               closeCalled.set(0);
+
+               final int WINDOW_SIZE = 3;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
+                       new SumReducer(),
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+                               TumblingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               stateDesc,
+                               new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(),
+                               EventTimeTrigger.create());
+
+               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.open();
+
+               testTumblingEventTimeWindows(testHarness);
 
                testHarness.close();
-               if (windowBufferFactory instanceof 
PreAggregatingHeapWindowBuffer.Factory) {
-                       Assert.assertEquals("Close was not called.", 2, 
closeCalled.get());
-               } else {
-                       Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-               }
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testTumblingEventTimeWindowsApply() throws Exception {
+               closeCalled.set(0);
+
+               final int WINDOW_SIZE = 3;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
+                       TumblingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
+                       new TimeWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new RichSumReducer<TimeWindow>(),
+                       EventTimeTrigger.create());
+
+               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.open();
+
+               testTumblingEventTimeWindows(testHarness);
+
+               testHarness.close();
+
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
        }
 
        @Test
@@ -249,13 +332,19 @@ public class WindowOperatorTest {
 
                final int WINDOW_SIZE = 3;
 
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, GlobalWindow> operator = new WindowOperator<>(
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
+                       new SumReducer(),
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new 
WindowOperator<>(
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               windowBufferFactory,
-                               new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
+                               stateDesc,
+                               new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(),
                                
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -263,6 +352,8 @@ public class WindowOperatorTest {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
OneInputStreamOperatorTestHarness<>(operator);
 
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
                long initialTime = 0L;
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -322,11 +413,6 @@ public class WindowOperatorTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.close();
-               if (windowBufferFactory instanceof 
PreAggregatingHeapWindowBuffer.Factory) {
-                       Assert.assertEquals("Close was not called.", 2, 
closeCalled.get());
-               } else {
-                       Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-               }
        }
 
        @Test
@@ -336,13 +422,19 @@ public class WindowOperatorTest {
 
                final int WINDOW_SIZE = 4;
 
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, GlobalWindow> operator = new WindowOperator<>(
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
+                       new SumReducer(),
+                       inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new 
WindowOperator<>(
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               windowBufferFactory,
-                               new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
+                               stateDesc,
+                               new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(),
                                
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse(
@@ -351,6 +443,8 @@ public class WindowOperatorTest {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
OneInputStreamOperatorTestHarness<>(operator);
 
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
                long initialTime = 0L;
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -387,19 +481,23 @@ public class WindowOperatorTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.close();
-               if (windowBufferFactory instanceof 
PreAggregatingHeapWindowBuffer.Factory) {
-                       Assert.assertEquals("Close was not called.", 2, 
closeCalled.get());
-               } else {
-                       Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-               }
-
        }
 
        // 
------------------------------------------------------------------------
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class SumReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+       public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
+                       Tuple2<String, Integer> value2) throws Exception {
+                       return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+               }
+       }
+
+
+       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
String, W> {
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;
@@ -417,24 +515,23 @@ public class WindowOperatorTest {
                }
 
                @Override
-               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
-                               Tuple2<String, Integer> value2) throws 
Exception {
+               public void apply(String key,
+                       W window,
+                       Iterable<Tuple2<String, Integer>> input,
+                       Collector<Tuple2<String, Integer>> out) throws 
Exception {
+
                        if (!openCalled) {
                                Assert.fail("Open was not called");
                        }
-                       return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+                       int sum = 0;
+
+                       for (Tuple2<String, Integer> t: input) {
+                               sum += t.f1;
+                       }
+                       out.collect(new Tuple2<>(key, sum));
+
                }
-       }
-       // 
------------------------------------------------------------------------
-       //  Parametrization for testing different window buffers
-       // 
------------------------------------------------------------------------
 
-       @Parameterized.Parameters(name = "WindowBuffer = {0}")
-       @SuppressWarnings("unchecked,rawtypes")
-       public static Collection<WindowBufferFactory[]> windowBuffers(){
-               return Arrays.asList(new WindowBufferFactory[]{new 
PreAggregatingHeapWindowBuffer.Factory(new SumReducer())},
-                               new WindowBufferFactory[]{new 
HeapWindowBuffer.Factory()}
-                               );
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 13766a1..1e6e475 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -35,8 +38,6 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -52,6 +53,29 @@ import java.util.concurrent.TimeUnit;
 public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
        /**
+        * .reduce() does not support RichReduceFunction, since the reduce 
function is used internally
+        * in a {@code ReducingState}.
+        */
+       @Test(expected = UnsupportedOperationException.class)
+       public void testReduceFailWithRichReducer() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                       .keyBy(0)
+                       .window(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                       .reduce(new RichReduceFunction<Tuple2<String, 
Integer>>() {
+                               @Override
+                               public Tuple2<String, Integer> 
reduce(Tuple2<String, Integer> value1,
+                                       Tuple2<String, Integer> value2) throws 
Exception {
+                                       return null;
+                               }
+                       });
+       }
+
+       /**
         * These tests ensure that the fast aligned time windows operator is 
used if the
         * conditions are right.
         */
@@ -76,7 +100,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
                                .window(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -118,12 +142,12 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertFalse(winOperator1.isSetProcessingTime());
                Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
-               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ReducingStateDescriptor);
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
                                .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -142,7 +166,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertFalse(winOperator2.isSetProcessingTime());
                Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
-               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
        }
 
        @Test
@@ -168,13 +192,13 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(winOperator1.isSetProcessingTime());
                Assert.assertTrue(winOperator1.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
-               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ReducingStateDescriptor);
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
                                .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -193,7 +217,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(winOperator2.isSetProcessingTime());
                Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
-               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
        }
 
        @Test
@@ -220,14 +244,14 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
-               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator1.getStateDescriptor() instanceof 
ListStateDescriptor);
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
                                .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
                                .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -247,14 +271,14 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
-               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator2.getStateDescriptor() instanceof 
ListStateDescriptor);
        }
 
        // 
------------------------------------------------------------------------
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class DummyReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+       public static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 0357144..90e63c4 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -176,8 +176,15 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window 
function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](function: AllWindowFunction[T, R, 
W]): DataStream[R] = {
-    javaStream.apply(clean(function), implicitly[TypeInformation[R]])
+  def apply[R: TypeInformation: ClassTag](
+      function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = {
+    val cleanedFunction = clean(function)
+    val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
+      def apply(window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
+        cleanedFunction(window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(javaFunction, implicitly[TypeInformation[R]])
   }
 
   /**
@@ -194,7 +201,7 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
   def apply[R: TypeInformation: ClassTag](
       function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     val cleanedFunction = clean(function)
-    val applyFunction = new AllWindowFunction[T, R, W] {
+    val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
       def apply(window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
         cleanedFunction(window, elements.asScala, out)
       }
@@ -232,7 +239,7 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    */
   def apply[R: TypeInformation: ClassTag](
       preAggregator: (T, T) => T,
-      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+      function: (W, T, Collector[R]) => Unit): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
@@ -247,8 +254,8 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
 
     val cleanApply = clean(function)
     val applyFunction = new AllWindowFunction[T, R, W] {
-      def apply(window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
-        cleanApply(window, elements.asScala, out)
+      def apply(window: W, input: T, out: Collector[R]): Unit = {
+        cleanApply(window, input, out)
       }
     }
     javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 93b91ff..8a49f40 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -179,8 +179,15 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    * @param function The window function.
    * @return The data stream that is the result of applying the window 
function to the window.
    */
-  def apply[R: TypeInformation: ClassTag](function: WindowFunction[T, R, K, 
W]): DataStream[R] = {
-    javaStream.apply(clean(function), implicitly[TypeInformation[R]])
+  def apply[R: TypeInformation: ClassTag](
+      function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = {
+    val cleanFunction = clean(function)
+    val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] {
+      def apply(key: K, window: W, input: java.lang.Iterable[T], out: 
Collector[R]) = {
+        cleanFunction.apply(key, window, input.asScala, out)
+      }
+    }
+    javaStream.apply(javaFunction, implicitly[TypeInformation[R]])
   }
 
   /**
@@ -201,7 +208,7 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     }
 
     val cleanedFunction = clean(function)
-    val applyFunction = new WindowFunction[T, R, K, W] {
+    val applyFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] {
       def apply(key: K, window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
         cleanedFunction(key, window, elements.asScala, out)
       }
@@ -239,7 +246,7 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    */
   def apply[R: TypeInformation: ClassTag](
       preAggregator: (T, T) => T,
-      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+      function: (K, W, T, Collector[R]) => Unit): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
@@ -254,8 +261,8 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
 
     val cleanApply = clean(function)
     val applyFunction = new WindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
-        cleanApply(key, window, elements.asScala, out)
+      def apply(key: K, window: W, input: T, out: Collector[R]): Unit = {
+        cleanApply(key, window, input, out)
       }
     }
     javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 4ec8f81..d293d1a 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -21,7 +21,8 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state.ReducingStateDescriptor
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, 
AllWindowFunction}
@@ -75,12 +76,12 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
-      def apply(
-                    window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
+      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), 
TimeWindow]() {
+        def apply(
+            window: TimeWindow,
+            values: Iterable[(String, Int)],
+            out: Collector[(String, Int)]) { }
+      })
 
     val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -121,10 +122,10 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     val window2 = source
       .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
+      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), 
TimeWindow]() {
       def apply(
                     window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
+                    values: Iterable[(String, Int)],
                     out: Collector[(String, Int)]) { }
     })
 
@@ -172,10 +173,10 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
+      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), 
TimeWindow]() {
       def apply(
                     window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
+                    values: Iterable[(String, Int)],
                     out: Collector[(String, Int)]) { }
     })
 
@@ -210,7 +211,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: java.lang.Iterable[(String, Int)],
+                   values: (String, Int),
                    out: Collector[(String, Int)]) { }
       })
 
@@ -219,12 +220,12 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator1 = transform1.getOperator
 
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
-      
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+      winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
 
     val window2 = source
@@ -235,7 +236,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: java.lang.Iterable[(String, Int)],
+                   values: (String, Int),
                    out: Collector[(String, Int)]) { }
       })
 
@@ -244,12 +245,12 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator2 = transform2.getOperator
 
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(
-      
winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+      winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
   }
 
 }
@@ -258,7 +259,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 //  UDFs
 // ------------------------------------------------------------------------
 
-class DummyReducer extends RichReduceFunction[(String, Int)] {
+class DummyReducer extends ReduceFunction[(String, Int)] {
   def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
     value1
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 90cce66..dfb5ea2 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
ReducingStateDescriptor}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
@@ -28,7 +29,6 @@ import 
org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvic
 import org.apache.flink.streaming.api.windowing.time.Time
 import 
org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, 
CountTrigger}
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer,
 PreAggregatingHeapWindowBuffer}
 import 
org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, 
WindowOperator, AccumulatingProcessingTimeWindowOperator, 
AggregatingProcessingTimeWindowOperator}
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
@@ -69,11 +69,11 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .window(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, 
TimeWindow]() {
         def apply(
             key: Tuple,
             window: TimeWindow,
-            values: java.lang.Iterable[(String, Int)],
+            values: Iterable[(String, Int)],
             out: Collector[(String, Int)]) { }
       })
 
@@ -106,23 +106,23 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator1 = transform1.getOperator
 
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
-      
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+      winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
 
     val window2 = source
       .keyBy(0)
       .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, 
TimeWindow]() {
       def apply(
                     tuple: Tuple,
                     window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
+                    values: Iterable[(String, Int)],
                     out: Collector[(String, Int)]) { }
     })
 
@@ -131,11 +131,11 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator2 = transform2.getOperator
 
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    
assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
   }
 
   @Test
@@ -164,7 +164,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
-    
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    
assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
 
 
     val window2 = source
@@ -172,11 +172,11 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, 
TimeWindow]() {
       def apply(
                     tuple: Tuple,
                     window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
+                    values: Iterable[(String, Int)],
                     out: Collector[(String, Int)]) { }
     })
 
@@ -190,7 +190,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    
assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
   }
 
   @Test
@@ -211,7 +211,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: java.lang.Iterable[(String, Int)],
+                   values: (String, Int),
                    out: Collector[(String, Int)]) { }
       })
 
@@ -220,12 +220,12 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator1 = transform1.getOperator
 
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
-      
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+      winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
 
     val window2 = source
@@ -236,7 +236,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: java.lang.Iterable[(String, Int)],
+                   values: (String, Int),
                    out: Collector[(String, Int)]) { }
       })
 
@@ -245,11 +245,11 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator2 = transform2.getOperator
 
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(
-      
winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+      winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 18c1b3c..9eca074 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -105,7 +105,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                                        NUM_ELEMENTS_PER_KEY / 
3))
                                        .rebalance()
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new 
RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
TimeWindow>() {
+                                       .apply(new 
RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -167,7 +167,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                        .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
                                        .rebalance()
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-                                       .apply(new 
RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
TimeWindow>() {
+                                       .apply(new 
RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -231,23 +231,13 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                                        .rebalance()
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS))
                                        .apply(
-                                                       new 
RichReduceFunction<Tuple2<Long, IntType>>() {
-
-                                                               private boolean 
open = false;
-
-                                                               @Override
-                                                               public void 
open(Configuration parameters) {
-                                                                       
assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                                       open = 
true;
-                                                               }
+                                                       new 
ReduceFunction<Tuple2<Long, IntType>>() {
 
                                                                @Override
                                                                public 
Tuple2<Long, IntType> reduce(
                                                                                
Tuple2<Long, IntType> a,
                                                                                
Tuple2<Long, IntType> b) {
 
-                                                                       // 
validate that the function has been opened properly
-                                                                       
assertTrue(open);
                                                                        return 
new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
                                                                }
                                                        },
@@ -264,20 +254,13 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                                                @Override
                                                public void apply(
                                                                TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> values,
+                                                               Tuple2<Long, 
IntType> input,
                                                                
Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
                                                        // validate that the 
function has been opened properly
                                                        assertTrue(open);
 
-                                                       int sum = 0;
-                                                       long key = -1;
-
-                                                       for (Tuple2<Long, 
IntType> value : values) {
-                                                               sum += 
value.f1.value;
-                                                               key = value.f0;
-                                                       }
-                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+                                                       out.collect(new 
Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
                                                }
                                        })
                                        .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
@@ -317,23 +300,13 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS),
                                                        Time.of(WINDOW_SLIDE, 
MILLISECONDS))
                                        .apply(
-                                                       new 
RichReduceFunction<Tuple2<Long, IntType>>() {
-
-                                                               private boolean 
open = false;
-
-                                                               @Override
-                                                               public void 
open(Configuration parameters) {
-                                                                       
assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                                       open = 
true;
-                                                               }
+                                                       new 
ReduceFunction<Tuple2<Long, IntType>>() {
 
                                                                @Override
                                                                public 
Tuple2<Long, IntType> reduce(
                                                                                
Tuple2<Long, IntType> a,
                                                                                
Tuple2<Long, IntType> b) {
 
-                                                                       // 
validate that the function has been opened properly
-                                                                       
assertTrue(open);
                                                                        return 
new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
                                                                }
                                                        },
@@ -350,20 +323,13 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                                                @Override
                                                public void apply(
                                                                TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> values,
+                                                               Tuple2<Long, 
IntType> input,
                                                                
Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
                                                        // validate that the 
function has been opened properly
                                                        assertTrue(open);
 
-                                                       int sum = 0;
-                                                       long key = -1;
-
-                                                       for (Tuple2<Long, 
IntType> value : values) {
-                                                               sum += 
value.f1.value;
-                                                               key = value.f0;
-                                                       }
-                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+                                                       out.collect(new 
Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
                                                }
                                        })
                                        .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 7a1a879..5886982 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -40,9 +43,17 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -55,12 +66,22 @@ import static org.junit.Assert.*;
  * of the emitted windows are deterministic.
  */
 @SuppressWarnings("serial")
+@RunWith(Parameterized.class)
 public class EventTimeWindowCheckpointingITCase extends TestLogger {
 
        private static final int PARALLELISM = 4;
 
        private static ForkableFlinkMiniCluster cluster;
 
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private StateBackendEnum stateBackendEnum;
+       private AbstractStateBackend stateBackend;
+
+       public EventTimeWindowCheckpointingITCase(StateBackendEnum 
stateBackendEnum) {
+               this.stateBackendEnum = stateBackendEnum;
+       }
 
        @BeforeClass
        public static void startTestCluster() {
@@ -81,6 +102,19 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                }
        }
 
+       @Before
+       public void initStateBackend() throws IOException {
+               switch (stateBackendEnum) {
+                       case MEM:
+                               this.stateBackend = new MemoryStateBackend();
+                               break;
+                       case FILE:
+                               String backups = 
tempFolder.newFolder().getAbsolutePath();
+                               this.stateBackend = new 
FsStateBackend("file://" + backups);
+                               break;
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        @Test
@@ -99,13 +133,14 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        env.enableCheckpointing(100);
                        env.setNumberOfExecutionRetries(3);
                        env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
 
                        env
                                        .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, Tuple, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -162,13 +197,14 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        env.enableCheckpointing(100);
                        env.setNumberOfExecutionRetries(3);
                        env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
 
                        env
                                        .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, Tuple, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -229,13 +265,14 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        env.enableCheckpointing(100);
                        env.setNumberOfExecutionRetries(3);
                        env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
 
                        env
                                        .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, Tuple, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -292,6 +329,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        env.enableCheckpointing(100);
                        env.setNumberOfExecutionRetries(3);
                        env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
 
                        env
                                        .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
@@ -299,23 +337,12 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
                                        .apply(
-                                                       new 
RichReduceFunction<Tuple2<Long, IntType>>() {
-
-                                                               private boolean 
open = false;
-
-                                                               @Override
-                                                               public void 
open(Configuration parameters) {
-                                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                                       open = 
true;
-                                                               }
+                                                       new 
ReduceFunction<Tuple2<Long, IntType>>() {
 
                                                                @Override
                                                                public 
Tuple2<Long, IntType> reduce(
                                                                                
Tuple2<Long, IntType> a,
                                                                                
Tuple2<Long, IntType> b) {
-
-                                                                       // 
validate that the function has been opened properly
-                                                                       
assertTrue(open);
                                                                        return 
new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
                                                                }
                                                        },
@@ -333,20 +360,13 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                public void apply(
                                                                Tuple tuple,
                                                                TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> values,
+                                                               Tuple2<Long, 
IntType> input,
                                                                
Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
                                                        // validate that the 
function has been opened properly
                                                        assertTrue(open);
 
-                                                       int sum = 0;
-                                                       long key = -1;
-
-                                                       for (Tuple2<Long, 
IntType> value : values) {
-                                                               sum += 
value.f1.value;
-                                                               key = value.f0;
-                                                       }
-                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+                                                       out.collect(new 
Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
                                                }
                                        })
                                        .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
@@ -377,6 +397,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        env.enableCheckpointing(100);
                        env.setNumberOfExecutionRetries(3);
                        env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
 
                        env
                                        .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
@@ -384,15 +405,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
                                        .apply(
-                                                       new 
RichReduceFunction<Tuple2<Long, IntType>>() {
-
-                                                               private boolean 
open = false;
-
-                                                               @Override
-                                                               public void 
open(Configuration parameters) {
-                                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                                       open = 
true;
-                                                               }
+                                                       new 
ReduceFunction<Tuple2<Long, IntType>>() {
 
                                                                @Override
                                                                public 
Tuple2<Long, IntType> reduce(
@@ -400,7 +413,6 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                                                
Tuple2<Long, IntType> b) {
 
                                                                        // 
validate that the function has been opened properly
-                                                                       
assertTrue(open);
                                                                        return 
new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
                                                                }
                                                        },
@@ -418,20 +430,13 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                public void apply(
                                                                Tuple tuple,
                                                                TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> values,
+                                                               Tuple2<Long, 
IntType> input,
                                                                
Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
                                                        // validate that the 
function has been opened properly
                                                        assertTrue(open);
 
-                                                       int sum = 0;
-                                                       long key = -1;
-
-                                                       for (Tuple2<Long, 
IntType> value : values) {
-                                                               sum += 
value.f1.value;
-                                                               key = value.f0;
-                                                       }
-                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+                                                       out.collect(new 
Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
                                                }
                                        })
                                        .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
@@ -583,7 +588,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        }
                                }
                        }
-                       assertTrue("The source must see all expected windows.", 
seenAll);
+                       assertTrue("The sink must see all expected windows.", 
seenAll);
                }
 
                @Override
@@ -723,6 +728,25 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Parametrization for testing with different state backends
+       // 
------------------------------------------------------------------------
+
+
+       @Parameterized.Parameters(name = "StateBackend = {0}")
+       @SuppressWarnings("unchecked,rawtypes")
+       public static Collection<Object[]> parameters(){
+               return Arrays.asList(new Object[][] {
+                               {StateBackendEnum.MEM},
+                               {StateBackendEnum.FILE},
+                       }
+               );
+       }
+
+       private enum StateBackendEnum {
+               MEM, FILE, DB, ROCKSDB
+       }
+
 
        // 
------------------------------------------------------------------------
        //  Utilities

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 8d59975..c9286ce 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -19,15 +19,15 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -117,7 +117,7 @@ public class WindowCheckpointingITCase extends TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(100, MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, 
TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long, IntType>, 
Tuple, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -175,7 +175,7 @@ public class WindowCheckpointingITCase extends TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(150, MILLISECONDS), 
Time.of(50, MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, 
TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long, IntType>, 
Tuple, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -240,23 +240,12 @@ public class WindowCheckpointingITCase extends TestLogger 
{
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(100, MILLISECONDS))
-                                       .reduce(new 
RichReduceFunction<Tuple2<Long, IntType>>() {
-
-                                               private boolean open = false;
-
-                                               @Override
-                                               public void open(Configuration 
parameters) {
-                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                       open = true;
-                                               }
+                                       .reduce(new ReduceFunction<Tuple2<Long, 
IntType>>() {
 
                                                @Override
                                                public Tuple2<Long, IntType> 
reduce(
                                                                Tuple2<Long, 
IntType> a,
                                                                Tuple2<Long, 
IntType> b) {
-
-                                                       // validate that the 
function has been opened properly
-                                                       assertTrue(open);
                                                        return new 
Tuple2<>(a.f0, new IntType(1));
                                                }
                                        })
@@ -299,23 +288,11 @@ public class WindowCheckpointingITCase extends TestLogger 
{
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(150, MILLISECONDS), 
Time.of(50, MILLISECONDS))
-                                       .reduce(new 
RichReduceFunction<Tuple2<Long, IntType>>() {
-
-                                               private boolean open = false;
-
-                                               @Override
-                                               public void open(Configuration 
parameters) {
-                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                       open = true;
-                                               }
-
+                                       .reduce(new ReduceFunction<Tuple2<Long, 
IntType>>() {
                                                @Override
                                                public Tuple2<Long, IntType> 
reduce(
                                                                Tuple2<Long, 
IntType> a,
                                                                Tuple2<Long, 
IntType> b) {
-
-                                                       // validate that the 
function has been opened properly
-                                                       assertTrue(open);
                                                        return new 
Tuple2<>(a.f0, new IntType(1));
                                                }
                                        })

Reply via email to