[FLINK-5237] Consolidate and harmonize Window Translation Tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5368a7d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5368a7d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5368a7d3

Branch: refs/heads/master
Commit: 5368a7d32d96beb1b8298b87d9ea6d42ea306947
Parents: fe2a301
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Nov 24 08:14:48 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../operators/StateDescriptorPassingTest.java   |  26 +
 .../windowing/WindowTranslationTest.java        | 718 +++++++++++++++--
 .../ScalaProcessWindowFunctionWrapper.scala     |  16 +-
 .../api/scala/WindowTranslationTest.scala       | 766 +++++++++++++++++--
 4 files changed, 1420 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 26cb7ac..813ca96 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -136,6 +137,31 @@ public class StateDescriptorPassingTest {
        }
 
        @Test
+       public void testProcessWindowState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, 
JavaSerializer.class);
+
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .keyBy(new KeySelector<File, String>() {
+                                       @Override
+                                       public String getKey(File value) {
+                                               return null;
+                                       }
+                               })
+                               .timeWindow(Time.milliseconds(1000))
+                               .process(new ProcessWindowFunction<File, 
String, String, TimeWindow>() {
+                                       @Override
+                                       public void process(String s, Context 
ctx,
+                                                       Iterable<File> input, 
Collector<String> out) {}
+                               });
+
+               validateListStateDescriptorConfigured(result);
+       }
+
+       @Test
        public void testFoldWindowAllState() throws Exception {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index f72a2f1..b899948 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -31,14 +31,17 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
@@ -112,7 +115,7 @@ public class WindowTranslationTest {
         * in a {@code AggregatingState}.
         */
        @Test(expected = UnsupportedOperationException.class)
-       public void testAgrgegateWithRichFunctionFails() throws Exception {
+       public void testAggregateWithRichFunctionFails() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -405,6 +408,82 @@ public class WindowTranslationTest {
                processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithProcesWindowFunctionEventTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .reduce(reducer, new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(String key,
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithProcessWindowFunctionProcessingTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .reduce(new DummyReducer(), new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(String tuple,
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ReducingStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
        /**
         * Test for the deprecated .apply(Reducer, WindowFunction).
         */
@@ -447,6 +526,50 @@ public class WindowTranslationTest {
                processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
+       /**
+        * Test for the deprecated .apply(Reducer, WindowFunction).
+        */
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithPreReducerAndEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .apply(reducer, new 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, 
String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in.f0, in.f1));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(operator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+
        // 
------------------------------------------------------------------------
        //  Aggregate Translation Tests
        // 
------------------------------------------------------------------------
@@ -463,13 +586,13 @@ public class WindowTranslationTest {
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .aggregate(new DummyAggregationFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = 
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
                                (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = 
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
                                (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
@@ -492,13 +615,13 @@ public class WindowTranslationTest {
                                
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
                                .aggregate(new DummyAggregationFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = 
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
                                (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = 
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
                                (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
@@ -529,7 +652,7 @@ public class WindowTranslationTest {
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = 
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
                                (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
@@ -569,6 +692,66 @@ public class WindowTranslationTest {
                                operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
        }
 
+       @Test
+       public void testAggregateWithProcessWindowFunctionEventTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .aggregate(new DummyAggregationFunction(), new 
TestProcessWindowFunction());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
+
+               processElementAndEnsureOutput(
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+       }
+
+       @Test
+       public void testAggregateWithProcessWindowFunctionProcessingTime() 
throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .aggregate(new DummyAggregationFunction(), new 
TestProcessWindowFunction());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
+
+               processElementAndEnsureOutput(
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+       }
+
        // 
------------------------------------------------------------------------
        //  Fold Translation Tests
        // 
------------------------------------------------------------------------
@@ -664,83 +847,269 @@ public class WindowTranslationTest {
        @SuppressWarnings("rawtypes")
        public void testFoldWithWindowFunctionProcessingTime() throws Exception 
{
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "empty", 0), new 
DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithProcessWindowFunctionEventTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "", 0), new 
DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(String key,
+                                                       Context ctx,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldWithProcessWindowFunctionProcessingTime() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .fold(new Tuple3<>("", "empty", 0), new 
DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(String key,
+                                                       Context ctx,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithPreFolderEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .apply(new Tuple3<>("", "", 0), new 
DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in. f1, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, 
Tuple3<String,String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyWithPreFolderAndEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .apply(new Tuple3<>("", "", 0), new 
DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(String key,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                               for (Tuple3<String, String, 
Integer> in : values) {
+                                                       out.collect(new 
Tuple3<>(in.f0, in. f1, in.f2));
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, 
Tuple3<String,String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  Apply Translation Tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testApplyEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-               DataStream<Tuple2<String, Integer>> window = source
+               DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(new TupleKeySelector())
-                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-                               .fold(new Tuple3<>("", "empty", 0), new 
DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, 
Tuple2<String, Integer>, String, TimeWindow>() {
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
                                        public void apply(String key,
                                                        TimeWindow window,
-                                                       Iterable<Tuple3<String, 
String, Integer>> values,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
                                                        
Collector<Tuple2<String, Integer>> out) throws Exception {
-                                               for (Tuple3<String, String, 
Integer> in : values) {
-                                                       out.collect(new 
Tuple2<>(in.f0, in.f2));
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
                                                }
                                        }
                                });
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
                Assert.assertTrue(operator instanceof WindowOperator);
                WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
-               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
-               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
-               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
                processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
        @Test
        @SuppressWarnings("rawtypes")
-       public void testApplyWithPreFolderEventTime() throws Exception {
+       public void testApplyProcessingTime() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-               DataStream<Tuple3<String, String, Integer>> window = source
+               DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(new TupleKeySelector())
-                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .apply(new Tuple3<>("", "", 0), new 
DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow>() {
+                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
                                        public void apply(String key,
                                                        TimeWindow window,
-                                                       Iterable<Tuple3<String, 
String, Integer>> values,
-                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
-                                               for (Tuple3<String, String, 
Integer> in : values) {
-                                                       out.collect(new 
Tuple3<>(in.f0, in. f1, in.f2));
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
                                                }
                                        }
                                });
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, 
Tuple3<String,String, Integer>> operator = transform.getOperator();
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
                Assert.assertTrue(operator instanceof WindowOperator);
                WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
-               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
-               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
-               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
FoldingStateDescriptor);
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
                processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
-       }
 
-       // 
------------------------------------------------------------------------
-       //  Apply Translation Tests
-       // 
------------------------------------------------------------------------
+       }
 
        @Test
        @SuppressWarnings("rawtypes")
-       public void testApplyEventTime() throws Exception {
+       public void testProcessEventTime() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
@@ -749,12 +1118,12 @@ public class WindowTranslationTest {
                DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(new TupleKeySelector())
                                .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+                               .process(new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
-                                       public void apply(String key,
-                                                       TimeWindow window,
+                                       public void process(String key,
+                                                       Context ctx,
                                                        Iterable<Tuple2<String, 
Integer>> values,
                                                        
Collector<Tuple2<String, Integer>> out) throws Exception {
                                                for (Tuple2<String, Integer> in 
: values) {
@@ -776,7 +1145,7 @@ public class WindowTranslationTest {
 
        @Test
        @SuppressWarnings("rawtypes")
-       public void testApplyProcessingTimeTime() throws Exception {
+       public void testProcessProcessingTime() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
@@ -785,12 +1154,12 @@ public class WindowTranslationTest {
                DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(new TupleKeySelector())
                                
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+                               .process(new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
-                                       public void apply(String key,
-                                                       TimeWindow window,
+                                       public void process(String key,
+                                                       Context ctx,
                                                        Iterable<Tuple2<String, 
Integer>> values,
                                                        
Collector<Tuple2<String, Integer>> out) throws Exception {
                                                for (Tuple2<String, Integer> in 
: values) {
@@ -903,6 +1272,43 @@ public class WindowTranslationTest {
 
        @Test
        @SuppressWarnings("rawtypes")
+       public void testProcessWithCustomTrigger() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .process(new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(String key,
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       @SuppressWarnings("rawtypes")
        public void testReduceWithEvictor() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
@@ -930,6 +1336,121 @@ public class WindowTranslationTest {
        }
 
        @Test
+       @SuppressWarnings("rawtypes")
+       public void testReduceWithEvictorAndProcessFunction() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .reduce(
+                                               reducer,
+                                               new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, 
TimeWindow>() {
+                                                       @Override
+                                                       public void process(
+                                                                       Tuple 
tuple,
+                                                                       Context 
context,
+                                                                       
Iterable<Tuple2<String, Integer>> elements,
+                                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                                               for 
(Tuple2<String, Integer> in : elements) {
+                                                                       
out.collect(in);
+                                                               }
+                                                       }
+                                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       public void testAggregateWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .aggregate(new DummyAggregationFunction());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+       }
+
+       @Test
+       public void testAggregateWithEvictorAndProcessFunction() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .aggregate(
+                                               new DummyAggregationFunction(),
+                                               new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow>() {
+                                                       @Override
+                                                       public void process(
+                                                                       String 
s,
+                                                                       Context 
context,
+                                                                       
Iterable<Tuple2<String, Integer>> elements,
+                                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                                               for 
(Tuple2<String, Integer> in : elements) {
+                                                                       
out.collect(in);
+                                                               }
+                                                       }
+                                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+
+               Assert.assertTrue(operator instanceof WindowOperator);
+               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+       }
+
+
+       @Test
        @SuppressWarnings({"rawtypes", "unchecked"})
        public void testFoldWithEvictor() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -958,6 +1479,48 @@ public class WindowTranslationTest {
        }
 
        @Test
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       public void testFoldWithEvictorAndProcessFunction() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple3<String, String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .evictor(CountEvictor.of(100))
+                               .fold(
+                                               new Tuple3<>("", "", 1),
+                                               new DummyFolder(),
+                                               new 
ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, 
Integer>, Tuple, TimeWindow>() {
+                                                       @Override
+                                                       public void process(
+                                                                       Tuple 
tuple,
+                                                                       Context 
context,
+                                                                       
Iterable<Tuple3<String, String, Integer>> elements,
+                                                                       
Collector<Tuple3<String, String, Integer>> out) throws Exception {
+                                                               for 
(Tuple3<String, String, Integer> in : elements) {
+                                                                       
out.collect(in);
+                                                               }
+                                                       }
+                                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
CountEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               winOperator.setOutputType((TypeInformation) window1.getType(), 
new ExecutionConfig());
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
        @SuppressWarnings("rawtypes")
        public void testApplyWithEvictor() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -996,6 +1559,45 @@ public class WindowTranslationTest {
                processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testProcessWithEvictor() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(new TupleKeySelector())
+                               .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .trigger(CountTrigger.of(1))
+                               .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
+                               .process(new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(String key,
+                                                       Context ctx,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (Tuple2<String, Integer> in 
: values) {
+                                                       out.collect(in);
+                                               }
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof EvictingWindowOperator);
+               EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> 
winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) 
operator;
+               Assert.assertTrue(winOperator.getTrigger() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator.getEvictor() instanceof 
TimeEvictor);
+               Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
+               Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
+
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+       }
+
        /**
         * Ensure that we get some output from the given operator when pushing 
in an element and
         * setting watermark and processing time to {@code Long.MAX_VALUE}.
@@ -1012,6 +1614,12 @@ public class WindowTranslationTest {
                                                keySelector,
                                                keyType);
 
+               if (operator instanceof OutputTypeConfigurable) {
+                       // use a dummy type since window functions just need 
the ExecutionConfig
+                       // this is also only needed for Fold, which we're 
getting rid off soon.
+                       ((OutputTypeConfigurable) 
operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
+               }
+
                testHarness.open();
 
                testHarness.setProcessingTime(0);
@@ -1050,7 +1658,7 @@ public class WindowTranslationTest {
                }
        }
 
-       private static class DummyAggregationFunction 
+       private static class DummyAggregationFunction
                        implements AggregateFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
                @Override
@@ -1096,7 +1704,7 @@ public class WindowTranslationTest {
                }
        }
 
-       private static class TestWindowFunction 
+       private static class TestWindowFunction
                        implements WindowFunction<Tuple2<String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow> {
 
                @Override
@@ -1111,6 +1719,22 @@ public class WindowTranslationTest {
                }
        }
 
+       private static class TestProcessWindowFunction
+                       extends ProcessWindowFunction<Tuple2<String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow> {
+
+               @Override
+               public void process(String key,
+                               Context ctx,
+                               Iterable<Tuple2<String, Integer>> values,
+                               Collector<Tuple3<String, String, Integer>> out) 
throws Exception {
+
+                       for (Tuple2<String, Integer> in : values) {
+                               out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+                       }
+               }
+       }
+
+
        private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 4a20371..23293a6 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.api.scala.function.util
 
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, 
RuntimeContext}
-import org.apache.flink.api.java.operators.translation.WrappingFunction
 import 
org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => 
JProcessWindowFunction}
 import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -37,8 +35,7 @@ import scala.collection.JavaConverters._
   */
 final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
     private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
-    extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
-    with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+    extends JProcessWindowFunction[IN, OUT, KEY, W] {
 
   override def process(
       key: KEY,
@@ -50,15 +47,4 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, 
W <: Window](
     }
     func.process(key, ctx, elements.asScala, out)
   }
-
-  override def getRuntimeContext: RuntimeContext = {
-    throw new RuntimeException("This should never be called")
-  }
-
-  override def getIterationRuntimeContext: IterationRuntimeContext = {
-    throw new RuntimeException("This should never be called")
-  }
 }
-
-private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
-  extends JProcessWindowFunction[IN, OUT, KEY, W]

Reply via email to