Repository: flink
Updated Branches:
  refs/heads/master 47b5cb795 -> f9eea5e5a


[FLINK-2674] Add Fold Window Operation for new Windowing API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9eea5e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9eea5e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9eea5e5

Branch: refs/heads/master
Commit: f9eea5e5a7d13d844e06d3eb6849268b3eaf4c08
Parents: ce792b1
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Oct 7 16:49:40 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  34 ++++
 .../api/datastream/WindowedStream.java          |  34 ++++
 .../windowing/FoldAllWindowFunction.java        |  97 ++++++++++
 .../functions/windowing/FoldWindowFunction.java |  97 ++++++++++
 .../windowing/NonKeyedWindowOperator.java       |  12 +-
 .../operators/windowing/WindowOperator.java     |  12 +-
 .../operators/windowing/WindowFoldITCase.java   | 191 +++++++++++++++++++
 .../streaming/api/scala/AllWindowedStream.scala |  43 ++++-
 .../streaming/api/scala/WindowedStream.scala    |  43 ++++-
 .../streaming/api/scala/WindowFoldITCase.scala  | 148 ++++++++++++++
 10 files changed, 707 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index a8d7654..c7a70d7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,6 +30,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -164,6 +166,38 @@ public class AllWindowedStream<T, W extends Window> {
        }
 
        /**
+        * Applies the given fold function to each window. The window function 
is called for each
+        * evaluation of the window for each key individually. The output of 
the reduce function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * @param function The fold function.
+        * @return The data stream that is the result of applying the fold 
function to the window.
+        */
+       public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> 
function) {
+               //clean the closure
+               function = input.getExecutionEnvironment().clean(function);
+
+               TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(function, input.getType(),
+                               Utils.getCallLocationName(), true);
+
+               return apply(new FoldAllWindowFunction<W, T, R>(initialValue, 
function), resultType);
+       }
+
+       /**
+        * Applies the given fold function to each window. The window function 
is called for each
+        * evaluation of the window for each key individually. The output of 
the reduce function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * @param function The fold function.
+        * @return The data stream that is the result of applying the fold 
function to the window.
+        */
+       public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> 
function, TypeInformation<R> resultType) {
+               //clean the closure
+               function = input.getExecutionEnvironment().clean(function);
+               return apply(new FoldAllWindowFunction<W, T, R>(initialValue, 
function), resultType);
+       }
+
+       /**
         * Applies a window function to the window. The window function is 
called for each evaluation
         * of the window for each key individually. The output of the window 
function is interpreted
         * as a regular non-windowed stream.

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 99f7d06..42e0bd7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,6 +31,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -178,6 +180,38 @@ public class WindowedStream<T, K, W extends Window> {
        }
 
        /**
+        * Applies the given fold function to each window. The window function 
is called for each
+        * evaluation of the window for each key individually. The output of 
the reduce function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * @param function The fold function.
+        * @return The data stream that is the result of applying the fold 
function to the window.
+        */
+       public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> 
function) {
+               //clean the closure
+               function = input.getExecutionEnvironment().clean(function);
+
+               TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(function, input.getType(),
+                               Utils.getCallLocationName(), true);
+
+               return apply(new FoldWindowFunction<K, W, T, R>(initialValue, 
function), resultType);
+       }
+
+       /**
+        * Applies the given fold function to each window. The window function 
is called for each
+        * evaluation of the window for each key individually. The output of 
the reduce function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * @param function The fold function.
+        * @return The data stream that is the result of applying the fold 
function to the window.
+        */
+       public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> 
function, TypeInformation<R> resultType) {
+               //clean the closure
+               function = input.getExecutionEnvironment().clean(function);
+               return apply(new FoldWindowFunction<K, W, T, R>(initialValue, 
function), resultType);
+       }
+
+       /**
         * Applies the given window function to each window. The window 
function is called for each
         * evaluation of the window for each key individually. The output of 
the window function is
         * interpreted as a regular non-windowed stream.

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
new file mode 100644
index 0000000..69f24fe
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class FoldAllWindowFunction<W extends Window, T, R>
+               extends WrappingFunction<FoldFunction<T, R>>
+               implements AllWindowFunction<T, R, W>, 
OutputTypeConfigurable<R> {
+       private static final long serialVersionUID = 1L;
+
+       private byte[] serializedInitialValue;
+       private TypeSerializer<R> outSerializer;
+       private transient R initialValue;
+
+       public FoldAllWindowFunction(R initialValue, FoldFunction<T, R> 
reduceFunction) {
+               super(reduceFunction);
+               this.initialValue = initialValue;
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               super.open(configuration);
+
+               if (serializedInitialValue == null) {
+                       throw new RuntimeException("No initial value was 
serialized for the fold " +
+                                       "window function. Probably the 
setOutputType method was not called.");
+               }
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
+               InputViewDataInputStreamWrapper in = new 
InputViewDataInputStreamWrapper(
+                               new DataInputStream(bais)
+               );
+               initialValue = outSerializer.deserialize(in);
+       }
+
+       @Override
+       public void apply(W window, Iterable<T> values, Collector<R> out) 
throws Exception {
+               R result = outSerializer.copy(initialValue);
+
+               for (T val: values) {
+                       result = wrappedFunction.fold(result, val);
+               }
+
+               out.collect(result);
+       }
+
+       @Override
+       public void setOutputType(TypeInformation<R> outTypeInfo, 
ExecutionConfig executionConfig) {
+               outSerializer = outTypeInfo.createSerializer(executionConfig);
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               OutputViewDataOutputStreamWrapper out = new 
OutputViewDataOutputStreamWrapper(
+                               new DataOutputStream(baos)
+               );
+
+               try {
+                       outSerializer.serialize(initialValue, out);
+               } catch (IOException ioe) {
+                       throw new RuntimeException("Unable to serialize initial 
value of type " +
+                                       initialValue.getClass().getSimpleName() 
+ " of fold window function.", ioe);
+               }
+
+               serializedInitialValue = baos.toByteArray();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
new file mode 100644
index 0000000..04d2ac7
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class FoldWindowFunction<K, W extends Window, T, R>
+               extends WrappingFunction<FoldFunction<T, R>>
+               implements WindowFunction<T, R, K, W>, 
OutputTypeConfigurable<R> {
+       private static final long serialVersionUID = 1L;
+
+       private byte[] serializedInitialValue;
+       private TypeSerializer<R> outSerializer;
+       private transient R initialValue;
+
+       public FoldWindowFunction(R initialValue, FoldFunction<T, R> 
reduceFunction) {
+               super(reduceFunction);
+               this.initialValue = initialValue;
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               super.open(configuration);
+
+               if (serializedInitialValue == null) {
+                       throw new RuntimeException("No initial value was 
serialized for the fold " +
+                                       "window function. Probably the 
setOutputType method was not called.");
+               }
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
+               InputViewDataInputStreamWrapper in = new 
InputViewDataInputStreamWrapper(
+                               new DataInputStream(bais)
+               );
+               initialValue = outSerializer.deserialize(in);
+       }
+
+       @Override
+       public void apply(K k, W window, Iterable<T> values, Collector<R> out) 
throws Exception {
+               R result = outSerializer.copy(initialValue);
+
+               for (T val: values) {
+                       result = wrappedFunction.fold(result, val);
+               }
+
+               out.collect(result);
+       }
+
+       @Override
+       public void setOutputType(TypeInformation<R> outTypeInfo, 
ExecutionConfig executionConfig) {
+               outSerializer = outTypeInfo.createSerializer(executionConfig);
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               OutputViewDataOutputStreamWrapper out = new 
OutputViewDataOutputStreamWrapper(
+                               new DataOutputStream(baos)
+               );
+
+               try {
+                       outSerializer.serialize(initialValue, out);
+               } catch (IOException ioe) {
+                       throw new RuntimeException("Unable to serialize initial 
value of type " +
+                                       initialValue.getClass().getSimpleName() 
+ " of fold window function.", ioe);
+               }
+
+               serializedInitialValue = baos.toByteArray();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index a80242d..e6aa53b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -56,7 +57,7 @@ import java.util.Set;
  */
 public class NonKeyedWindowOperator<IN, OUT, W extends Window>
                extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, 
OUT, W>>
-               implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
+               implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable, OutputTypeConfigurable<OUT> {
 
        private static final long serialVersionUID = 1L;
 
@@ -268,6 +269,15 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                return this;
        }
 
+       @Override
+       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+               if (userFunction instanceof OutputTypeConfigurable) {
+                       @SuppressWarnings("unchecked")
+                       OutputTypeConfigurable<OUT> typeConfigurable = 
(OutputTypeConfigurable<OUT>) userFunction;
+                       typeConfigurable.setOutputType(outTypeInfo, 
executionConfig);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        // Getters for testing
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 368b8fa..7762101 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -74,7 +75,7 @@ import java.util.Set;
  */
 public class WindowOperator<K, IN, OUT, W extends Window>
                extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, 
K, W>>
-               implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
+               implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable, OutputTypeConfigurable<OUT> {
 
        private static final long serialVersionUID = 1L;
 
@@ -342,6 +343,15 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                return this;
        }
 
+       @Override
+       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+               if (userFunction instanceof OutputTypeConfigurable) {
+                       @SuppressWarnings("unchecked")
+                       OutputTypeConfigurable<OUT> typeConfigurable = 
(OutputTypeConfigurable<OUT>) userFunction;
+                       typeConfigurable.setOutputType(outTypeInfo, 
executionConfig);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        // Getters for testing
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
new file mode 100644
index 0000000..45649bd
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
@@ -0,0 +1,191 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for Folds over windows. These also test whether 
OutputTypeConfigurable functions
+ * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
+ */
+public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
+
+       private static List<String> testResults;
+
+       @Test
+       public void testFoldWindow() throws Exception {
+
+               testResults = Lists.newArrayList();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(1);
+
+               DataStream<Tuple2<String, Integer>> source1 = env.addSource(new 
SourceFunction<Tuple2<String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple2<String, Integer>> 
ctx) throws Exception {
+                               ctx.collect(Tuple2.of("a", 0));
+                               ctx.collect(Tuple2.of("a", 1));
+                               ctx.collect(Tuple2.of("a", 2));
+
+                               ctx.collect(Tuple2.of("b", 3));
+                               ctx.collect(Tuple2.of("b", 4));
+                               ctx.collect(Tuple2.of("b", 5));
+
+                               ctx.collect(Tuple2.of("a", 6));
+                               ctx.collect(Tuple2.of("a", 7));
+                               ctx.collect(Tuple2.of("a", 8));
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               }).extractTimestamp(new Tuple2TimestampExtractor());
+
+               source1
+                               .keyBy(0)
+                               .window(TumblingTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+                               .fold(Tuple2.of("R:", 0), new 
FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+                                       @Override
+                                       public Tuple2<String, Integer> 
fold(Tuple2<String, Integer> accumulator,
+                                                       Tuple2<String, Integer> 
value) throws Exception {
+                                               accumulator.f0 += value.f0;
+                                               accumulator.f1 += value.f1;
+                                               return accumulator;
+                                       }
+                               })
+                               .addSink(new SinkFunction<Tuple2<String, 
Integer>>() {
+                                       @Override
+                                       public void invoke(Tuple2<String, 
Integer> value) throws Exception {
+                                               
testResults.add(value.toString());
+                                       }
+                               });
+
+               env.execute("Fold Window Test");
+
+               List<String> expectedResult = Lists.newArrayList(
+                               "(R:aaa,3)",
+                               "(R:aaa,21)",
+                               "(R:bbb,12)");
+
+               Collections.sort(expectedResult);
+               Collections.sort(testResults);
+
+               Assert.assertEquals(expectedResult, testResults);
+       }
+
+       @Test
+       public void testFoldAllWindow() throws Exception {
+
+               testResults = Lists.newArrayList();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(1);
+
+               DataStream<Tuple2<String, Integer>> source1 = env.addSource(new 
SourceFunction<Tuple2<String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple2<String, Integer>> 
ctx) throws Exception {
+                               ctx.collect(Tuple2.of("a", 0));
+                               ctx.collect(Tuple2.of("a", 1));
+                               ctx.collect(Tuple2.of("a", 2));
+
+                               ctx.collect(Tuple2.of("b", 3));
+                               ctx.collect(Tuple2.of("a", 3));
+                               ctx.collect(Tuple2.of("b", 4));
+                               ctx.collect(Tuple2.of("a", 4));
+                               ctx.collect(Tuple2.of("b", 5));
+                               ctx.collect(Tuple2.of("a", 5));
+
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               }).extractTimestamp(new Tuple2TimestampExtractor());
+
+               source1
+                               .windowAll(TumblingTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+                               .fold(Tuple2.of("R:", 0), new 
FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+                                       @Override
+                                       public Tuple2<String, Integer> 
fold(Tuple2<String, Integer> accumulator,
+                                                       Tuple2<String, Integer> 
value) throws Exception {
+                                               accumulator.f0 += value.f0;
+                                               accumulator.f1 += value.f1;
+                                               return accumulator;
+                                       }
+                               })
+                               .addSink(new SinkFunction<Tuple2<String, 
Integer>>() {
+                                       @Override
+                                       public void invoke(Tuple2<String, 
Integer> value) throws Exception {
+                                               
testResults.add(value.toString());
+                                       }
+                               });
+
+               env.execute("Fold All-Window Test");
+
+               List<String> expectedResult = Lists.newArrayList(
+                               "(R:aaa,3)",
+                               "(R:bababa,24)");
+
+               Collections.sort(expectedResult);
+               Collections.sort(testResults);
+
+               Assert.assertEquals(expectedResult, testResults);
+       }
+
+       private static class Tuple2TimestampExtractor implements 
TimestampExtractor<Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public long extractTimestamp(Tuple2<String, Integer> element, 
long currentTimestamp) {
+                       return element.f1;
+               }
+
+               @Override
+               public long emitWatermark(Tuple2<String, Integer> element, long 
currentTimestamp) {
+                       return element.f1 - 1;
+               }
+
+               @Override
+               public long getCurrentWatermark() {
+                       return Long.MIN_VALUE;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index d2d0a1d..65cafb7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => 
JavaAllWStream}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
@@ -122,6 +122,47 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
   }
 
   /**
+   * Applies the given fold function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
reduce function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * @param function The fold function.
+   * @return The data stream that is the result of applying the fold function 
to the window.
+   */
+  def fold[R: TypeInformation: ClassTag](
+      initialValue: R,
+      function: FoldFunction[T,R]): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+
+    val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    javaStream.fold(initialValue, function, resultType)
+  }
+
+  /**
+   * Applies the given fold function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
reduce function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * @param function The fold function.
+   * @return The data stream that is the result of applying the fold function 
to the window.
+   */
+  def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => 
R): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    val cleanFun = clean(function)
+    val folder = new FoldFunction[T,R] {
+      def fold(acc: R, v: T) = {
+        cleanFun(acc, v)
+      }
+    }
+    fold(initialValue, folder)
+  }
+
+  /**
    * Applies the given window function to each window. The window function is 
called for each
    * evaluation of the window for each key individually. The output of the 
window function is
    * interpreted as a regular non-windowed stream.

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 3963765..a8ddaf8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => 
JavaWStream}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
@@ -125,6 +125,47 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
   }
 
   /**
+   * Applies the given fold function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
reduce function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * @param function The fold function.
+   * @return The data stream that is the result of applying the fold function 
to the window.
+   */
+  def fold[R: TypeInformation: ClassTag](
+      initialValue: R,
+      function: FoldFunction[T,R]): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+
+    val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    javaStream.fold(initialValue, function, resultType)
+  }
+
+  /**
+   * Applies the given fold function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
reduce function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * @param function The fold function.
+   * @return The data stream that is the result of applying the fold function 
to the window.
+   */
+  def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => 
R): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    val cleanFun = clean(function)
+    val folder = new FoldFunction[T,R] {
+      def fold(acc: R, v: T) = {
+        cleanFun(acc, v)
+      }
+    }
+    fold(initialValue, folder)
+  }
+
+  /**
    * Applies the given window function to each window. The window function is 
called for each
    * evaluation of the window for each key individually. The output of the 
window function is
    * interpreted as a regular non-windowed stream.

http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
new file mode 100644
index 0000000..dd098a0
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.TimestampExtractor
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.mutable
+
+/**
+ * Tests for Folds over windows. These also test whether 
OutputTypeConfigurable functions
+ * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
+ */
+class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testFoldWindow(): Unit = {
+    WindowFoldITCase.testResults = mutable.MutableList()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+      }
+
+      def cancel() {
+      }
+    }).extractTimestamp(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + 
v._1, acc._2 + v._2) })
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+        WindowFoldITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(R:aaa,3)",
+      "(R:aaa,21)",
+      "(R:bbb,12)")
+
+    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+  }
+
+  @Test
+  def testFoldAllWindow(): Unit = {
+    WindowFoldITCase.testResults = mutable.MutableList()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("a", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("a", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 5))
+      }
+
+      def cancel() {
+      }
+    }).extractTimestamp(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + 
v._1, acc._2 + v._2) })
+      .addSink(new SinkFunction[(String, Int)]() {
+      def invoke(value: (String, Int)) {
+        WindowFoldITCase.testResults += value.toString
+      }
+    })
+
+    env.execute("Fold All-Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(R:aaa,3)",
+      "(R:bababa,24)")
+
+    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+  }
+
+}
+
+
+object WindowFoldITCase {
+  private var testResults: mutable.MutableList[String] = null
+
+  private class Tuple2TimestampExtractor extends TimestampExtractor[(String, 
Int)] {
+    def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long 
= {
+      element._2
+    }
+
+    def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = {
+      element._2 - 1
+    }
+
+    def getCurrentWatermark: Long = {
+      Long.MinValue
+    }
+  }
+}

Reply via email to