Repository: flink
Updated Branches:
  refs/heads/master 87b907736 -> 788b83921


http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index b6c1618..34eac9e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.streaming.api.datastream.AllWindowedStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
@@ -383,6 +384,119 @@ public class AllWindowTranslationTest {
                processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithProcessWindowFunctionEventTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .reduce(reducer, new 
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, 
Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithProcessWindowFunctionProcessingTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .reduce(new DummyReducer(), new 
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, 
Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithEvictorAndProcessFunction() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .reduce(
+                                               reducer,
+                                               new 
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, 
TimeWindow>() {
+                                                       @Override
+                                                       public void process(
+                                                                       Context 
context,
+                                                                       
Iterable<Tuple2<String, Integer>> elements,
+                                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                                               for 
(Tuple2<String, Integer> in : elements) {
+                                                                       
out.collect(in);
+                                                               }
+                                                       }
+                                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
        /**
         * Test for the deprecated .apply(Reducer, WindowFunction).
         */
@@ -540,6 +654,226 @@ public class AllWindowTranslationTest {
                                operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
        }
 
+       @Test
+       public void testAggregateWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .aggregate(new DummyAggregationFunction());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+       }
+
+       @Test
+       public void testAggregateWithEvictorAndProcessFunction() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .aggregate(
+                                               new DummyAggregationFunction(),
+                                               new 
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, 
TimeWindow>() {
+                                                       @Override
+                                                       public void process(
+                                                                       Context 
context,
+                                                                       
Iterable<Tuple2<String, Integer>> elements,
+                                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                                               for 
(Tuple2<String, Integer> in : elements) {
+                                                                       
out.collect(in);
+                                                               }
+                                                       }
+                                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  process() translation tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testProcessEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .process(new 
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testProcessProcessingTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .process(new 
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testProcessWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
+                               .process(new 
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
TimeEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testProcessWithCustomTrigger() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .process(new 
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+
        // 
------------------------------------------------------------------------
        //  fold() translation tests
        // 
------------------------------------------------------------------------
@@ -666,6 +1000,117 @@ public class AllWindowTranslationTest {
 
        @Test
        @SuppressWarnings("rawtypes")
+       public void testFoldWithProcessAllWindowFunctionEventTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "", 0), new 
DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+                                       @Override
+                                       public void process(
+                                                       Context ctx,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithProcessAllWindowFunctionProcessingTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "empty", 0), new 
DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(
+                                                       Context ctx,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       public void testFoldWithEvictorAndProcessFunction() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window1 = source
+                               
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .fold(
+                                               new Tuple3<>("", "", 1),
+                                               new DummyFolder(),
+                                               new 
ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, 
String, Integer>, TimeWindow>() {
+                                                       @Override
+                                                       public void process(
+                                                                       Context 
context,
+                                                                       
Iterable<Tuple3<String, String, Integer>> elements,
+                                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                                               for 
(Tuple3<String, String, Integer> in : elements) {
+                                                                       
out.collect(in);
+                                                               }
+                                                       }
+                                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               winOperator.setOutputType((TypeInformation) window1.getType(), 
new ExecutionConfig());
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
        public void testApplyWithPreFolderEventTime() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index cf062fc..694353c 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -18,20 +18,19 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{PublicEvolving, Public}
+import org.apache.flink.annotation.{Public, PublicEvolving}
 import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, 
ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => 
JavaAllWStream}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
-import org.apache.flink.streaming.api.scala.function.AllWindowFunction
-import 
org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, 
ScalaAllWindowFunctionWrapper, ScalaReduceFunction, ScalaFoldFunction}
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, 
ProcessAllWindowFunction}
+import 
org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, 
ScalaAllWindowFunctionWrapper, ScalaFoldFunction, 
ScalaProcessAllWindowFunctionWrapper, ScalaReduceFunction}
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
-
 import org.apache.flink.util.Preconditions.checkNotNull
 
 /**
@@ -199,6 +198,64 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     asScalaStream(javaStream.reduce(reducer, applyFunction, returnType))
   }
 
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The process window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  @PublicEvolving
+  def reduce[R: TypeInformation](
+      preAggregator: ReduceFunction[T],
+      windowFunction: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+    val cleanedReducer = clean(preAggregator)
+    val cleanedWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, 
W](cleanedWindowFunction)
+
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(cleanedReducer, applyFunction, returnType))
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The process window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  @PublicEvolving
+  def reduce[R: TypeInformation](
+      preAggregator: (T, T) => T,
+      windowFunction: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+    if (preAggregator == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (windowFunction == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val reducer = new ScalaReduceFunction[T](cleanReducer)
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, 
W](cleanWindowFunction)
+
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(reducer, applyFunction, returnType))
+  }
+
   // --------------------------- aggregate() ----------------------------------
 
   /**
@@ -257,6 +314,39 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
 
   /**
    * Applies the given window function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given aggregation function.
+   *
+   * @param preAggregator The aggregation function that is used for 
pre-aggregation
+   * @param windowFunction The process window function.
+   * @return The data stream that is the result of applying the window 
function to the window.
+   */
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
+      (preAggregator: AggregateFunction[T, ACC, V],
+       windowFunction: ProcessAllWindowFunction[V, R, W]): DataStream[R] = {
+
+    checkNotNull(preAggregator, "AggregationFunction must not be null")
+    checkNotNull(windowFunction, "Window function must not be null")
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[V, R, 
W](cleanedWindowFunction)
+
+    val accumulatorType: TypeInformation[ACC] = 
implicitly[TypeInformation[ACC]]
+    val aggregationResultType: TypeInformation[V] = 
implicitly[TypeInformation[V]]
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    asScalaStream(javaStream.aggregate(
+      cleanedPreAggregator, applyFunction,
+      accumulatorType, aggregationResultType, resultType))
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is 
called for each
    * evaluation of the window. The output of the window function is
    * interpreted as a regular non-windowed stream.
    *
@@ -367,6 +457,37 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     *
     * @param initialValue Initial value of the fold
     * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The process window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  @PublicEvolving
+  def fold[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC,
+      preAggregator: FoldFunction[T, ACC],
+      windowFunction: ProcessAllWindowFunction[ACC, R, W]): DataStream[R] = {
+
+    val cleanFolder = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[ACC, R, 
W](cleanWindowFunction)
+
+    asScalaStream(javaStream.fold(
+      initialValue,
+      cleanFolder,
+      applyFunction,
+      implicitly[TypeInformation[ACC]],
+      implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation folder.
+    *
+    * @param initialValue Initial value of the fold
+    * @param preAggregator The reduce function that is used for pre-aggregation
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window 
function to the window.
     */
@@ -393,6 +514,42 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     asScalaStream(javaStream.fold(initialValue, folder, applyFunction, 
accType, returnType))
   }
 
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation folder.
+    *
+    * @param initialValue Initial value of the fold
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  @PublicEvolving
+  def fold[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC,
+      preAggregator: (ACC, T) => ACC,
+      windowFunction: ProcessAllWindowFunction[ACC, R, W]): DataStream[R] = {
+
+    if (preAggregator == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (windowFunction == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanFolder = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val folder = new ScalaFoldFunction[T, ACC](cleanFolder)
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[ACC, R, 
W](cleanWindowFunction)
+
+    val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.fold(initialValue, folder, applyFunction, 
accType, returnType))
+  }
+
   // ---------------------------- apply() -------------------------------------
 
   /**
@@ -403,6 +560,27 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    * Not that this function requires that all data in the windows is buffered 
until the window
    * is evaluated, as the function provides no means of pre-aggregation.
    *
+   * @param function The process window function.
+   * @return The data stream that is the result of applying the window 
function to the window.
+   */
+  @PublicEvolving
+  def process[R: TypeInformation](
+      function: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val javaFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, 
W](cleanedFunction)
+
+    asScalaStream(javaStream.process(javaFunction, 
implicitly[TypeInformation[R]]))
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Not that this function requires that all data in the windows is buffered 
until the window
+   * is evaluated, as the function provides no means of pre-aggregation.
+   *
    * @param function The window function.
    * @return The data stream that is the result of applying the window 
function to the window.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
new file mode 100644
index 0000000..163117b
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+  * Base abstract class for functions that are evaluated over keyed (grouped)
+  * windows using a context for retrieving extra information.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam W The type of the window.
+  */
+@PublicEvolving
+abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function 
with Serializable {
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param context  The context in which the window is being evaluated.
+    * @param elements The elements in the window being evaluated.
+    * @param out      A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program 
and trigger recovery.
+    */
+  @throws[Exception]
+  def process(context: Context, elements: Iterable[IN], out: Collector[OUT])
+
+  /**
+    * The context holding window metadata
+    */
+  abstract class Context {
+    /**
+      * @return The window that is being evaluated.
+      */
+    def window: W
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
new file mode 100644
index 0000000..22d64a8
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.beans.Transient
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, 
RichFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base abstract class for functions that are evaluated over
+  * keyed (grouped) windows using a context for retrieving extra information.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam W The type of the window.
+  */
+@Public
+abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
+    extends ProcessAllWindowFunction[IN, OUT, W]
+    with RichFunction {
+
+  @Transient
+  private var runtimeContext: RuntimeContext = null
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Runtime context access
+  // 
--------------------------------------------------------------------------------------------
+
+  override def setRuntimeContext(t: RuntimeContext) {
+    this.runtimeContext = t
+  }
+
+  override def getRuntimeContext: RuntimeContext = {
+    if (this.runtimeContext != null) {
+      this.runtimeContext
+    }
+    else {
+      throw new IllegalStateException("The runtime context has not been 
initialized.")
+    }
+  }
+
+  override def getIterationRuntimeContext: IterationRuntimeContext = {
+    if (this.runtimeContext == null) {
+      throw new IllegalStateException("The runtime context has not been 
initialized.")
+    }
+    else {
+      this.runtimeContext match {
+        case iterationRuntimeContext: IterationRuntimeContext => 
iterationRuntimeContext
+        case _ =>
+          throw new IllegalStateException("This stub is not part of an 
iteration step function.")
+      }
+    }
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Default life cycle methods
+  // 
--------------------------------------------------------------------------------------------
+
+  @throws[Exception]
+  override def open(parameters: Configuration) {
+  }
+
+  @throws[Exception]
+  override def close() {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 23293a6..a4fec64 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -18,8 +18,16 @@
 
 package org.apache.flink.streaming.api.scala.function.util
 
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
 import 
org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => 
JProcessWindowFunction}
-import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
+import 
org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction 
=> JRichProcessWindowFunction}
+import 
org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction
 => JRichProcessAllWindowFunction}
+import 
org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => 
JProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => 
ScalaProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction 
=> ScalaProcessAllWindowFunction}
+import 
org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => 
ScalaRichProcessWindowFunction}
+import 
org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => 
ScalaRichProcessAllWindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -34,8 +42,8 @@ import scala.collection.JavaConverters._
   *   - Java WindowFunction: java.lang.Iterable
   */
 final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
-    private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
-    extends JProcessWindowFunction[IN, OUT, KEY, W] {
+    private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W])
+    extends JRichProcessWindowFunction[IN, OUT, KEY, W] {
 
   override def process(
       key: KEY,
@@ -47,4 +55,75 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, 
W <: Window](
     }
     func.process(key, ctx, elements.asScala, out)
   }
+
+  override def setRuntimeContext(t: RuntimeContext): Unit = {
+    super.setRuntimeContext(t)
+    func match {
+      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => 
rfunc.setRuntimeContext(t)
+      case _ =>
+    }
+  }
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    func match {
+      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => 
rfunc.open(parameters)
+      case _ =>
+    }
+  }
+
+  override def close(): Unit = {
+    super.close()
+    func match {
+      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => 
rfunc.close()
+      case _ =>
+    }
+  }
+}
+
+/**
+  * A wrapper function that exposes a Scala ProcessWindowFunction
+  * as a ProcessWindowFunction function.
+  *
+  * The Scala and Java Window functions differ in their type of "Iterable":
+  *   - Scala WindowFunction: scala.Iterable
+  *   - Java WindowFunction: java.lang.Iterable
+  */
+final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
+    private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W])
+    extends JRichProcessAllWindowFunction[IN, OUT, W] {
+
+  override def process(
+      context: JProcessAllWindowFunction[IN, OUT, W]#Context,
+      elements: java.lang.Iterable[IN],
+      out: Collector[OUT]): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+    }
+    func.process(ctx, elements.asScala, out)
+  }
+
+  override def setRuntimeContext(t: RuntimeContext): Unit = {
+    super.setRuntimeContext(t)
+    func match {
+      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => 
rfunc.setRuntimeContext(t)
+      case _ =>
+    }
+  }
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    func match {
+      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => 
rfunc.open(parameters)
+      case _ =>
+    }
+  }
+
+  override def close(): Unit = {
+    super.close()
+    func match {
+      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => 
rfunc.close()
+      case _ =>
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 7e067a0..ee9f50c 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.scala.function.{WindowFunction, 
AllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, 
ProcessAllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
@@ -356,6 +356,85 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testReduceWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, 
Int), TimeWindow] {
+          override def process(context: Context,
+                               elements: Iterable[(String, Int)],
+                               out: Collector[(String, Int)]): Unit = {
+            elements foreach ( x => out.collect(x))
+          }
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, 
Int), TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => 
out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testApplyWithPreReducerEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -566,6 +645,72 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testAggregateWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .aggregate(new DummyAggregator(), new TestProcessAllWindowFunction())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_,
 _, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testAggregateWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+        .aggregate(new DummyAggregator(), new TestProcessAllWindowFunction())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_,
 _, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testAggregateWithWindowFunctionEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -789,6 +934,88 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testFoldWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        new DummyFolder,
+        new ProcessAllWindowFunction[(String, String, Int), (String, Int), 
TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => 
out.collect((x._1, x._3))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_,
 _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        new DummyFolder,
+        new ProcessAllWindowFunction[(String, String, Int), (String, Int), 
TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => 
out.collect((x._1, x._3))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_,
 _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testApplyWithPreFolderEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -951,6 +1178,84 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testProcessEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
+      .process(
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] 
{
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => 
out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testProcessProcessingTimeTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
+      .process(
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] 
{
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => 
out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testApplyEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1095,6 +1400,46 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testProcessWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .process(
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] 
{
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => 
out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ 
<: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testReduceWithEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1211,6 +1556,46 @@ class AllWindowTranslationTest {
       ("hello", 1))
   }
 
+  @Test
+  def testProcessWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .process(
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] 
{
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => 
out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: 
Window]])
+
+    val winOperator = operator
+      .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, 
Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    
assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
 
   /**
     * Ensure that we get some output from the given operator when pushing in 
an element and
@@ -1218,10 +1603,10 @@ class AllWindowTranslationTest {
     */
   @throws[Exception]
   private def processElementAndEnsureOutput[K, IN, OUT](
-                                                         operator: 
OneInputStreamOperator[IN, OUT],
-                                                         keySelector: 
KeySelector[IN, K],
-                                                         keyType: 
TypeInformation[K],
-                                                         element: IN) {
+      operator: OneInputStreamOperator[IN, OUT],
+      keySelector: KeySelector[IN, K],
+      keyType: TypeInformation[K],
+      element: IN) {
     val testHarness =
       new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, 
keySelector, keyType)
 
@@ -1243,7 +1628,8 @@ class AllWindowTranslationTest {
   }
 }
 
-class TestAllWindowFunction extends AllWindowFunction[(String, Int), (String, 
Int), TimeWindow] {
+class TestAllWindowFunction
+    extends AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
 
   override def apply(
       window: TimeWindow,
@@ -1253,3 +1639,15 @@ class TestAllWindowFunction extends 
AllWindowFunction[(String, Int), (String, In
     input.foreach(out.collect)
   }
 }
+
+class TestProcessAllWindowFunction
+    extends ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] 
{
+
+  override def process(
+      context: Context,
+      input: Iterable[(String, Int)],
+      out: Collector[(String, Int)]): Unit = {
+
+    input.foreach(out.collect)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index a23145c..dc38758 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import 
org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction,
 CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import 
org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction,
 CheckingIdentityRichProcessAllWindowFunction, 
CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -150,7 +150,6 @@ class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  @Ignore
   def testFoldWithProcessWindowFunction(): Unit = {
     WindowFoldITCase.testResults = mutable.MutableList()
     CheckingIdentityRichProcessWindowFunction.reset()
@@ -310,6 +309,63 @@ class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
 
     CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
   }
+
+  @Test
+  def testFoldAllWithProcessWindowFunction(): Unit = {
+    WindowFoldITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessAllWindowFunction.reset()
+
+    val foldFunc = new FoldFunction[(String, Int), (String, Int)] {
+      override def fold(accumulator: (String, Int), value: (String, Int)): 
(String, Int) = {
+        (accumulator._1 + value._1, accumulator._2 + value._2)
+      }
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("a", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("a", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 5))
+
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new 
WindowFoldITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+      .fold(
+        ("R:", 0),
+        foldFunc,
+        new CheckingIdentityRichProcessAllWindowFunction[(String, Int), 
TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFoldITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold All-Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(R:aaa,3)",
+      "(R:bababa,24)")
+
+    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
index bfbe6ee..eb9f361 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import 
org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction,
 CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import 
org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction,
 CheckingIdentityRichProcessAllWindowFunction, 
CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -87,7 +87,6 @@ class WindowFunctionITCase {
   }
 
   @Test
-  @Ignore
   def testRichProcessWindowFunction(): Unit = {
     WindowFunctionITCase.testResults = mutable.MutableList()
     CheckingIdentityRichProcessWindowFunction.reset()
@@ -183,6 +182,54 @@ class WindowFunctionITCase {
 
     CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
   }
+
+  @Test
+  def testRichProcessAllWindowFunction(): Unit = {
+    WindowFunctionITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessAllWindowFunction.reset()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
+      }
+
+      def cancel() {}
+
+    }).assignTimestampsAndWatermarks(new 
WindowFunctionITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+      .process(new CheckingIdentityRichProcessAllWindowFunction[(String, Int), 
TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFunctionITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("RichAllWindowFunction Test")
+
+    val expectedResult = mutable.MutableList(
+      "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+      "(b,3)", "(b,4)", "(b,5)")
+
+    assertEquals(expectedResult.sorted, 
WindowFunctionITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+  }
 }
 
 object WindowFunctionITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index 5418108..ee1dbfd 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import 
org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction,
 CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import 
org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction,
 CheckingIdentityRichProcessAllWindowFunction, 
CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -149,7 +149,6 @@ class WindowReduceITCase extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  @Ignore
   def testReduceWithProcessWindowFunction(): Unit = {
     WindowReduceITCase.testResults = mutable.MutableList()
     CheckingIdentityRichProcessWindowFunction.reset()
@@ -307,6 +306,62 @@ class WindowReduceITCase extends 
StreamingMultipleProgramsTestBase {
 
     CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
   }
+
+  @Test
+  def testReduceAllWithProcessWindowFunction(): Unit = {
+    WindowReduceITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessAllWindowFunction.reset()
+
+    val reduceFunc = new ReduceFunction[(String, Int)] {
+      override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = 
{
+        (a._1 + b._1, a._2 + b._2)
+      }
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("a", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("a", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 5))
+
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new 
WindowReduceITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+      .reduce(
+        reduceFunc,
+        new CheckingIdentityRichProcessAllWindowFunction[(String, Int), 
TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowReduceITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold All-Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(aaa,3)",
+      "(bababa,24)")
+
+    assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+  }
 }
 
 object WindowReduceITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
new file mode 100644
index 0000000..df005fa
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window]
+  extends RichProcessAllWindowFunction[T, T, W] {
+
+  override def process(context: Context, input: Iterable[T], out: 
Collector[T]): Unit = {
+    for (value <- input) {
+      out.collect(value)
+    }
+  }
+
+  override def open(conf: Configuration): Unit = {
+    super.open(conf)
+    CheckingIdentityRichProcessAllWindowFunction.openCalled = true
+  }
+
+  override def close(): Unit = {
+    super.close()
+    CheckingIdentityRichProcessAllWindowFunction.closeCalled = true
+  }
+
+  override def setRuntimeContext(context: RuntimeContext): Unit = {
+    super.setRuntimeContext(context)
+    CheckingIdentityRichProcessAllWindowFunction.contextSet = true
+  }
+}
+
+object CheckingIdentityRichProcessAllWindowFunction {
+
+  @volatile
+  private[CheckingIdentityRichProcessAllWindowFunction] var closeCalled = false
+
+  @volatile
+  private[CheckingIdentityRichProcessAllWindowFunction] var openCalled = false
+
+  @volatile
+  private[CheckingIdentityRichProcessAllWindowFunction] var contextSet = false
+
+  def reset(): Unit = {
+    closeCalled = false
+    openCalled = false
+    contextSet = false
+  }
+
+  def checkRichMethodCalls(): Unit = {
+    if (!contextSet) {
+      throw new AssertionError("context not set")
+    }
+    if (!openCalled) {
+      throw new AssertionError("open() not called")
+    }
+    if (!closeCalled) {
+      throw new AssertionError("close() not called")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 7d37d1a..903179d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
@@ -253,6 +254,78 @@ public class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
                Assert.assertEquals(expectedResult, testResults);
        }
 
+       @Test
+       public void testFoldProcessAllWindow() throws Exception {
+
+               testResults = new ArrayList<>();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(1);
+
+               DataStream<Tuple2<String, Integer>> source1 = env.addSource(new 
SourceFunction<Tuple2<String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple2<String, Integer>> 
ctx) throws Exception {
+                               ctx.collect(Tuple2.of("a", 0));
+                               ctx.collect(Tuple2.of("a", 1));
+                               ctx.collect(Tuple2.of("a", 2));
+
+                               ctx.collect(Tuple2.of("b", 3));
+                               ctx.collect(Tuple2.of("b", 4));
+                               ctx.collect(Tuple2.of("b", 5));
+
+                               ctx.collect(Tuple2.of("a", 6));
+                               ctx.collect(Tuple2.of("a", 7));
+                               ctx.collect(Tuple2.of("a", 8));
+
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
+                       }
+
+                       @Override
+                       public void cancel() {}
+
+               }).assignTimestampsAndWatermarks(new 
Tuple2TimestampExtractor());
+
+               source1
+                       .windowAll(TumblingEventTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+                       .fold(Tuple2.of(0, "R:"), new 
FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() {
+                               @Override
+                               public Tuple2<Integer, String> 
fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws 
Exception {
+                                       accumulator.f1 += value.f0;
+                                       accumulator.f0 += value.f1;
+                                       return accumulator;
+                               }
+                       }, new ProcessAllWindowFunction<Tuple2<Integer, 
String>, Tuple3<String, Integer, Integer>, TimeWindow>() {
+                               @Override
+                               public void process(Context context, 
Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, 
Integer>> out) throws Exception {
+                                       int i = 0;
+                                       for (Tuple2<Integer, String> in : 
elements) {
+                                               out.collect(new Tuple3<>(in.f1, 
in.f0, i++));
+                                       }
+                               }
+                       })
+                       .addSink(new SinkFunction<Tuple3<String, Integer, 
Integer>>() {
+                               @Override
+                               public void invoke(Tuple3<String, Integer, 
Integer> value) throws Exception {
+                                       testResults.add(value.toString());
+                               }
+                       });
+
+               env.execute("Fold Process Window Test");
+
+               List<String> expectedResult = Arrays.asList(
+                       "(R:aaa,3,0)",
+                       "(R:aaa,21,0)",
+                       "(R:bbb,12,0)");
+
+               Collections.sort(expectedResult);
+               Collections.sort(testResults);
+
+               Assert.assertEquals(expectedResult, testResults);
+       }
+
        private static class Tuple2TimestampExtractor implements 
AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
 
                @Override

Reply via email to