http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 104bc7b..a9c3ef6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -59,7 +60,9 @@ import 
org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -432,6 +435,78 @@ public class WindowOperatorTest extends TestLogger {
 
        @Test
        @SuppressWarnings("unchecked")
+       public void testSessionWindowsWithProcessFunction() throws Exception {
+               closeCalled.set(0);
+
+               final int SESSION_SIZE = 3;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                               inputType.createSerializer(new 
ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
+                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               stateDesc,
+                               new InternalIterableProcessWindowFunction<>(new 
SessionProcessWindowFunction()),
+                               EventTimeTrigger.create(),
+                               0);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               // add elements out-of-order
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 0));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 2), 1000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3), 2500));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 10));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 1000));
+
+               // do a snapshot, close and restore again
+               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               testHarness.close();
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 2500));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 4), 5501));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 5), 6000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 5), 6000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 6), 6050));
+
+               testHarness.processWatermark(new Watermark(12000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 
10L, 5500L), 5499));
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 
0L, 5500L), 5499));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 
5501L, 9050L), 9049));
+               expectedOutput.add(new Watermark(12000));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 10), 15000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 20), 15000));
+
+               testHarness.processWatermark(new Watermark(17999));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 
15000L, 18000L), 17999));
+               expectedOutput.add(new Watermark(17999));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
+
+               testHarness.close();
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
        public void testReduceSessionWindows() throws Exception {
                closeCalled.set(0);
 
@@ -500,6 +575,76 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.close();
        }
 
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testReduceSessionWindowsWithProcessFunction() throws 
Exception {
+               closeCalled.set(0);
+
+               final int SESSION_SIZE = 3;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>(
+                               "window-contents", new SumReducer(), 
inputType.createSerializer(new ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new 
WindowOperator<>(
+                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               stateDesc,
+                               new 
InternalSingleValueProcessWindowFunction<>(new 
ReducedProcessSessionWindowFunction()),
+                               EventTimeTrigger.create(),
+                               0);
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               // add elements out-of-order
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 0));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 2), 1000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3), 2500));
+
+               // do a snapshot, close and restore again
+               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               testHarness.close();
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 10));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 1000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 2500));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 4), 5501));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 5), 6000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 5), 6000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 6), 6050));
+
+               testHarness.processWatermark(new Watermark(12000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 
10L, 5500L), 5499));
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 
0L, 5500L), 5499));
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 
5501L, 9050L), 9049));
+               expectedOutput.add(new Watermark(12000));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 10), 15000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 20), 15000));
+
+               testHarness.processWatermark(new Watermark(17999));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 
15000L, 18000L), 17999));
+               expectedOutput.add(new Watermark(17999));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
+
+               testHarness.close();
+       }
+
        /**
         * This tests whether merging works correctly with the CountTrigger.
         * @throws Exception
@@ -2379,6 +2524,38 @@ public class WindowOperatorTest extends TestLogger {
                }
        }
 
+       public static class SessionProcessWindowFunction extends 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, 
String, TimeWindow> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void process(String key,
+                               Context context,
+                               Iterable<Tuple2<String, Integer>> values,
+                               Collector<Tuple3<String, Long, Long>> out) 
throws Exception {
+                       int sum = 0;
+                       for (Tuple2<String, Integer> i: values) {
+                               sum += i.f1;
+                       }
+                       String resultString = key + "-" + sum;
+                       TimeWindow window = context.window();
+                       out.collect(new Tuple3<>(resultString, 
window.getStart(), window.getEnd()));
+               }
+       }
+
+       public static class ReducedProcessSessionWindowFunction extends 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, 
String, TimeWindow> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void process(String key,
+                               Context context,
+                               Iterable<Tuple2<String, Integer>> values,
+                               Collector<Tuple3<String, Long, Long>> out) 
throws Exception {
+                       TimeWindow window = context.window();
+                       for (Tuple2<String, Integer> val: values) {
+                               out.collect(new Tuple3<>(key + "-" + val.f1, 
window.getStart(), window.getEnd()));
+                       }
+               }
+       }
 
        public static class PointSessionWindows extends EventTimeSessionWindows 
{
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 1e3e3d5..7d37d1a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -18,17 +18,22 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -114,6 +119,79 @@ public class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
        }
 
        @Test
+       public void testFoldProcessWindow() throws Exception {
+
+               testResults = new ArrayList<>();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(1);
+
+               DataStream<Tuple2<String, Integer>> source1 = env.addSource(new 
SourceFunction<Tuple2<String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple2<String, Integer>> 
ctx) throws Exception {
+                               ctx.collect(Tuple2.of("a", 0));
+                               ctx.collect(Tuple2.of("a", 1));
+                               ctx.collect(Tuple2.of("a", 2));
+
+                               ctx.collect(Tuple2.of("b", 3));
+                               ctx.collect(Tuple2.of("b", 4));
+                               ctx.collect(Tuple2.of("b", 5));
+
+                               ctx.collect(Tuple2.of("a", 6));
+                               ctx.collect(Tuple2.of("a", 7));
+                               ctx.collect(Tuple2.of("a", 8));
+
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
+                       }
+
+                       @Override
+                       public void cancel() {}
+
+               }).assignTimestampsAndWatermarks(new 
Tuple2TimestampExtractor());
+
+               source1
+                       .keyBy(0)
+                       .window(TumblingEventTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+                       .fold(Tuple2.of(0, "R:"), new 
FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() {
+                               @Override
+                               public Tuple2<Integer, String> 
fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws 
Exception {
+                                       accumulator.f1 += value.f0;
+                                       accumulator.f0 += value.f1;
+                                       return accumulator;
+                               }
+                       }, new ProcessWindowFunction<Tuple2<Integer, String>, 
Tuple3<String, Integer, Integer>, Tuple, TimeWindow>() {
+                               @Override
+                               public void process(Tuple tuple, Context 
context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, 
Integer, Integer>> out) throws Exception {
+                                       int i = 0;
+                                       for (Tuple2<Integer, String> in : 
elements) {
+                                               out.collect(new Tuple3<>(in.f1, 
in.f0, i++));
+                                       }
+                               }
+                       })
+                       .addSink(new SinkFunction<Tuple3<String, Integer, 
Integer>>() {
+                               @Override
+                               public void invoke(Tuple3<String, Integer, 
Integer> value) throws Exception {
+                                       testResults.add(value.toString());
+                               }
+                       });
+
+               env.execute("Fold Process Window Test");
+
+               List<String> expectedResult = Arrays.asList(
+                       "(R:aaa,3,0)",
+                       "(R:aaa,21,0)",
+                       "(R:bbb,12,0)");
+
+               Collections.sort(expectedResult);
+               Collections.sort(testResults);
+
+               Assert.assertEquals(expectedResult, testResults);
+       }
+
+       @Test
        public void testFoldAllWindow() throws Exception {
 
                testResults = new ArrayList<>();

Reply via email to