[FLINK-2991] Add Folding State and use in WindowOperator

This enables efficient incremental aggregation of fold window.

This also adds:
- WindowedStream.apply(initVal, foldFunction, windowFunction)
- AllWindowedStream.apply(initVal, foldFunction, windowFunction)

This closes #1605


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94cba899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94cba899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94cba899

Branch: refs/heads/master
Commit: 94cba8998c726092e2cc80fd022ca40bf0c38ec2
Parents: d93b154
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Feb 8 14:56:19 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 12 18:51:01 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBFoldingState.java    | 187 ++++++++++++++++++
 .../streaming/state/RocksDBListState.java       |   9 +-
 .../streaming/state/RocksDBReducingState.java   |  14 +-
 .../streaming/state/RocksDBStateBackend.java    |  20 +-
 .../streaming/state/RocksDBValueState.java      |  14 +-
 .../contrib/streaming/state/DbStateBackend.java |  18 ++
 .../flink/api/common/state/FoldingState.java    |  37 ++++
 .../common/state/FoldingStateDescriptor.java    | 108 ++++++++++
 .../flink/api/common/state/StateBackend.java    |   8 +
 .../flink/api/common/state/StateDescriptor.java |   2 +-
 .../runtime/state/AbstractStateBackend.java     |  20 ++
 .../runtime/state/GenericFoldingState.java      | 133 +++++++++++++
 .../flink/runtime/state/GenericListState.java   |   6 +
 .../runtime/state/GenericReducingState.java     |   7 +
 .../state/filesystem/FsFoldingState.java        | 145 ++++++++++++++
 .../state/filesystem/FsStateBackend.java        |   7 +
 .../runtime/state/memory/MemFoldingState.java   | 118 +++++++++++
 .../state/memory/MemoryStateBackend.java        |   7 +
 .../runtime/state/StateBackendTestBase.java     | 105 ++++++++++
 .../api/datastream/AllWindowedStream.java       | 103 ++++++++--
 .../api/datastream/WindowedStream.java          | 198 +++++++++++++------
 .../windowing/FoldAllWindowFunction.java        |  92 ---------
 .../windowing/FoldApplyAllWindowFunction.java   |  95 +++++++++
 .../windowing/FoldApplyWindowFunction.java      |  95 +++++++++
 .../functions/windowing/FoldWindowFunction.java |  91 ---------
 .../windowing/PassThroughAllWindowFunction.java |  30 +++
 .../windowing/PassThroughWindowFunction.java    |  30 +++
 .../windowing/ReduceAllWindowFunction.java      |  30 ---
 .../windowing/ReduceWindowFunction.java         |  30 ---
 .../ReduceWindowFunctionWithWindow.java         |  31 ---
 .../operators/FoldApplyWindowFunctionTest.java  | 143 ++++++++++++++
 .../api/operators/FoldWindowFunctionTest.java   | 132 -------------
 .../operators/windowing/WindowOperatorTest.java |  10 +-
 .../runtime/state/StateBackendITCase.java       |   8 +
 .../streaming/api/scala/AllWindowedStream.scala |  60 ++++++
 .../streaming/api/scala/WindowedStream.scala    |  60 ++++++
 36 files changed, 1707 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
new file mode 100644
index 0000000..7e4e573
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ReducingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ * @param <Backend> The type of the backend that snapshots this key/value 
state.
+ */
+public class RocksDBFoldingState<K, N, T, ACC, Backend extends 
AbstractStateBackend>
+       extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend>
+       implements FoldingState<T, ACC> {
+
+       /** Serializer for the values */
+       private final TypeSerializer<ACC> valueSerializer;
+
+       /** This holds the name of the state and can create an initial default 
value for the state. */
+       protected final FoldingStateDescriptor<T, ACC> stateDesc;
+
+       /** User-specified fold function */
+       private final FoldFunction<T, ACC> foldFunction;
+
+       /**
+        * Creates a new {@code RocksDBFoldingState}.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                     and can create a default state value.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param backupPath The path where to store backups.
+        */
+       protected RocksDBFoldingState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc,
+               File dbPath,
+               String backupPath) {
+               super(keySerializer, namespaceSerializer, dbPath, backupPath);
+               this.stateDesc = requireNonNull(stateDesc);
+               this.valueSerializer = stateDesc.getSerializer();
+               this.foldFunction = stateDesc.getFoldFunction();
+       }
+
+       /**
+        * Creates a {@code RocksDBFoldingState} by restoring from a directory.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                     and can create a default state value.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param backupPath The path where to store backups.
+        * @param restorePath The path on the local file system that we are 
restoring from.
+        */
+       protected RocksDBFoldingState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc,
+               File dbPath,
+               String backupPath,
+               String restorePath) {
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+               this.stateDesc = stateDesc;
+               this.valueSerializer = stateDesc.getSerializer();
+               this.foldFunction = stateDesc.getFoldFunction();
+       }
+
+       @Override
+       public ACC get() {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+                       byte[] valueBytes = db.get(key);
+                       if (valueBytes == null) {
+                               return stateDesc.getDefaultValue();
+                       }
+                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+               } catch (IOException|RocksDBException e) {
+                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
+               }
+       }
+
+       @Override
+       public void add(T value) throws IOException {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+                       byte[] valueBytes = db.get(key);
+
+                       if (valueBytes == null) {
+                               baos.reset();
+                               
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), 
value), out);
+                               db.put(key, baos.toByteArray());
+                       } else {
+                               ACC oldValue = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+                               ACC newValue = foldFunction.fold(oldValue, 
value);
+                               baos.reset();
+                               valueSerializer.serialize(newValue, out);
+                               db.put(key, baos.toByteArray());
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
+               }
+       }
+
+       @Override
+       protected KvStateSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> createRocksDBSnapshot(
+               URI backupUri,
+               long checkpointId) {
+               return new Snapshot<>(dbPath, checkpointPath, backupUri, 
checkpointId, keySerializer, namespaceSerializer, stateDesc);
+       }
+
+       private static class Snapshot<K, N, T, ACC, Backend extends 
AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, 
ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
+               private static final long serialVersionUID = 1L;
+
+               public Snapshot(File dbPath,
+                       String checkpointPath,
+                       URI backupUri,
+                       long checkpointId,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDesc) {
+                       super(dbPath,
+                               checkpointPath,
+                               backupUri,
+                               checkpointId,
+                               keySerializer,
+                               namespaceSerializer,
+                               stateDesc);
+               }
+
+               @Override
+               protected KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> createRocksDBState(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath) throws Exception {
+                       return new RocksDBFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index da07f75..6c55566 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -62,8 +62,9 @@ public class RocksDBListState<K, N, V>
         * @param keySerializer The serializer for the keys.
         * @param namespaceSerializer The serializer for the namespace.
         * @param stateDesc The state identifier for the state. This contains 
name
-        *                           and can create a default state value.
+        *                     and can create a default state value.
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param backupPath The path where to store backups.
         */
        protected RocksDBListState(TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,
@@ -78,13 +79,15 @@ public class RocksDBListState<K, N, V>
        }
 
        /**
-        * Creates a new {@code RocksDBListState}.
+        * Creates a {@code RocksDBListState} by restoring from a directory.
         *
         * @param keySerializer The serializer for the keys.
         * @param namespaceSerializer The serializer for the namespace.
         * @param stateDesc The state identifier for the state. This contains 
name
-        *                           and can create a default state value.
+        *                     and can create a default state value.
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param backupPath The path where to store backups.
+        * @param restorePath The path on the local file system that we are 
restoring from.
         */
        protected RocksDBListState(TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 81f9ffb..b7ba3c7 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -63,8 +63,9 @@ public class RocksDBReducingState<K, N, V>
         * @param keySerializer The serializer for the keys.
         * @param namespaceSerializer The serializer for the namespace.
         * @param stateDesc The state identifier for the state. This contains 
name
-        *                           and can create a default state value.
+        *                     and can create a default state value.
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param backupPath The path where to store backups.
         */
        protected RocksDBReducingState(TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,
@@ -79,6 +80,17 @@ public class RocksDBReducingState<K, N, V>
                this.reduceFunction = stateDesc.getReduceFunction();
        }
 
+       /**
+        * Creates a {@code RocksDBReducingState} by restoring from a directory.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                     and can create a default state value.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param backupPath The path where to store backups.
+        * @param restorePath The path on the local file system that we are 
restoring from.
+        */
        protected RocksDBReducingState(TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,
                        ReducingStateDescriptor<V> stateDesc,

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 5b16e86..b323c5e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -242,7 +244,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        
        @Override
        protected <N, T> ValueState<T> createValueState(TypeSerializer<N> 
namespaceSerializer,
-               ValueStateDescriptor<T> stateDesc) throws Exception {
+                       ValueStateDescriptor<T> stateDesc) throws Exception {
+
                File dbPath = getDbPath(stateDesc.getName());
                String checkpointPath = getCheckpointPath(stateDesc.getName());
                
@@ -252,7 +255,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
 
        @Override
        protected <N, T> ListState<T> createListState(TypeSerializer<N> 
namespaceSerializer,
-               ListStateDescriptor<T> stateDesc) throws Exception {
+                       ListStateDescriptor<T> stateDesc) throws Exception {
+
                File dbPath = getDbPath(stateDesc.getName());
                String checkpointPath = getCheckpointPath(stateDesc.getName());
                
@@ -262,7 +266,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
 
        @Override
        protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> 
namespaceSerializer,
-               ReducingStateDescriptor<T> stateDesc) throws Exception {
+                       ReducingStateDescriptor<T> stateDesc) throws Exception {
+
                File dbPath = getDbPath(stateDesc.getName());
                String checkpointPath = getCheckpointPath(stateDesc.getName());
                
@@ -271,6 +276,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
+       protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDesc) throws 
Exception {
+
+               File dbPath = getDbPath(stateDesc.getName());
+               String checkpointPath = getCheckpointPath(stateDesc.getName());
+               return new RocksDBFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath);
+       }
+
+       @Override
        public CheckpointStateOutputStream createCheckpointStateOutputStream(
                        long checkpointID, long timestamp) throws Exception {
                

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 388f099..7a19153 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -54,13 +54,14 @@ public class RocksDBValueState<K, N, V>
        protected final ValueStateDescriptor<V> stateDesc;
 
        /**
-        * Creates a new {@code RocksDBReducingState}.
+        * Creates a new {@code RocksDBValueState}.
         *
         * @param keySerializer The serializer for the keys.
         * @param namespaceSerializer The serializer for the namespace.
         * @param stateDesc The state identifier for the state. This contains 
name
         *                           and can create a default state value.
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param backupPath The path where to store backups.
         */
        protected RocksDBValueState(TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,
@@ -74,6 +75,17 @@ public class RocksDBValueState<K, N, V>
                this.valueSerializer = stateDesc.getSerializer();
        }
 
+       /**
+        * Creates a {@code RocksDBValueState} by restoring from a directory.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param backupPath The path where to store backups.
+        * @param restorePath The path on the local file system that we are 
restoring from.
+        */
        protected RocksDBValueState(TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,
                        ValueStateDescriptor<V> stateDesc,

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index 5162983..d82bfb2 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -27,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.GenericFoldingState;
 import org.apache.flink.runtime.state.GenericListState;
 import org.apache.flink.runtime.state.GenericReducingState;
 import org.apache.flink.runtime.state.StateHandle;
@@ -241,6 +244,21 @@ public class DbStateBackend extends AbstractStateBackend {
        }
 
        @Override
+       protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDesc) throws 
Exception {
+
+               if (!stateDesc.isSerializerInitialized()) {
+                       throw new IllegalArgumentException("state descriptor 
serializer not initialized");
+               }
+
+               ValueStateDescriptor<ACC> valueStateDescriptor = new 
ValueStateDescriptor<>(
+                       stateDesc.getName(), stateDesc.getSerializer(), 
stateDesc.getDefaultValue());
+
+               ValueState<ACC> valueState = 
createValueState(namespaceSerializer, valueStateDescriptor);
+               return new GenericFoldingState<>(valueState, 
stateDesc.getFoldFunction());
+       }
+
+       @Override
        public void initializeForJob(final Environment env,
                String operatorIdentifier,
                TypeSerializer<?> keySerializer) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
new file mode 100644
index 0000000..d328c2e
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+/**
+ * {@link State} interface for folding state. Elements can be added to the 
state, they will
+ * be successively added to the initial value using a
+ * {@link org.apache.flink.api.common.functions.FoldFunction}. The current 
state can be inspected.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed 
consistently
+ * by the system as part of the distributed snapshots.
+ * 
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
+ * automatically supplied by the system, so the function always sees the value 
mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ * 
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ */
+public interface FoldingState<T, ACC> extends MergingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
new file mode 100644
index 0000000..52ad712
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link StateDescriptor} for {@link FoldingState}. This can be used to 
create partitioned
+ * folding state.
+ *
+ * @param <T> Type of the values folded int othe state
+ * @param <ACC> Type of the value in the state
+ */
+public class FoldingStateDescriptor<T, ACC> extends 
StateDescriptor<FoldingState<T, ACC>, ACC> {
+       private static final long serialVersionUID = 1L;
+
+
+       private final FoldFunction<T, ACC> foldFunction;
+
+       /**
+        * Creates a new {@code FoldingStateDescriptor} with the given name, 
type, and initial value.
+        *
+        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+        * consider using the {@link #FoldingStateDescriptor(String, ACC, 
FoldFunction, TypeInformation)} constructor.
+        *
+        * @param name The (unique) name for the state.
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The {@code FoldFunction} used to aggregate the 
state.
+        * @param typeClass The type of the values in the state.
+        */
+       public FoldingStateDescriptor(String name, ACC initialValue, 
FoldFunction<T, ACC> foldFunction, Class<ACC> typeClass) {
+               super(name, typeClass, initialValue);
+               this.foldFunction = requireNonNull(foldFunction);
+
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
of FoldingState can not be a RichFunction.");
+               }
+       }
+
+       /**
+        * Creates a new {@code FoldingStateDescriptor} with the given name and 
default value.
+        *
+        * @param name The (unique) name for the state.
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The {@code FoldFunction} used to aggregate the 
state.
+        * @param typeInfo The type of the values in the state.
+        */
+       public FoldingStateDescriptor(String name, ACC initialValue, 
FoldFunction<T, ACC> foldFunction, TypeInformation<ACC> typeInfo) {
+               super(name, typeInfo, initialValue);
+               this.foldFunction = requireNonNull(foldFunction);
+
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
of FoldingState can not be a RichFunction.");
+               }
+       }
+
+       /**
+        * Creates a new {@code ValueStateDescriptor} with the given name and 
default value.
+        *
+        * @param name The (unique) name for the state.
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The {@code FoldFunction} used to aggregate the 
state.
+        * @param typeSerializer The type serializer of the values in the state.
+        */
+       public FoldingStateDescriptor(String name, ACC initialValue, 
FoldFunction<T, ACC> foldFunction, TypeSerializer<ACC> typeSerializer) {
+               super(name, typeSerializer, initialValue);
+               this.foldFunction = requireNonNull(foldFunction);
+
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
of FoldingState can not be a RichFunction.");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public FoldingState<T, ACC> bind(StateBackend stateBackend) throws 
Exception {
+               return stateBackend.createFoldingState(this);
+       }
+
+       /**
+        * Returns the fold function to be used for the folding state.
+        */
+       public FoldFunction<T, ACC> getFoldFunction() {
+               return foldFunction;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
index 8c7c608..a61433a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
@@ -48,4 +48,12 @@ public interface StateBackend {
         */
        <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> 
stateDesc) throws Exception;
 
+       /**
+        * Creates and returns a new {@link FoldingState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <T> Type of the values folded into the state
+        * @param <ACC> Type of the value in the state
+        */
+       <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 38087fc..4cf2371 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -229,7 +229,7 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
        @Override
        public String toString() {
                return getClass().getSimpleName() + 
-                               "{ name=" + name +
+                               "{name=" + name +
                                ", defaultValue=" + defaultValue +
                                ", serializer=" + serializer +
                                '}';

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index d0c4f82..beccd86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -148,6 +150,18 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
        protected abstract <N, T> ReducingState<T> 
createReducingState(TypeSerializer<N> namespaceSerializer, 
ReducingStateDescriptor<T> stateDesc) throws Exception;
 
        /**
+        * Creates and returns a new {@link FoldingState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <T> Type of the values folded into the state
+        * @param <ACC> Type of the value in the state   *
+        */
+       abstract protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+
+       /**
         * Sets the current key that is used for partitioned state.
         * @param currentKey The current key.
         */
@@ -223,6 +237,12 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                        public <T> ReducingState<T> 
createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
                                return 
AbstractStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
                        }
+
+                       @Override
+                       public <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+                               return 
AbstractStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
+                       }
+
                });
 
                keyValueStatesByName.put(stateDescriptor.getName(), (KvState) 
kvstate);

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
new file mode 100644
index 0000000..ef1d796
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Generic implementation of {@link FoldingState} based on a wrapped {@link 
ValueState}.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ * @param <Backend> The type of {@link AbstractStateBackend} that manages this 
{@code KvState}.
+ * @param <W> Generic type that extends both the underlying {@code ValueState} 
and {@code KvState}.
+ */
+public class GenericFoldingState<K, N, T, ACC, Backend extends 
AbstractStateBackend, W extends ValueState<ACC> & KvState<K, N, 
ValueState<ACC>, ValueStateDescriptor<ACC>, Backend>>
+       implements FoldingState<T, ACC>, KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> {
+
+       private final W wrappedState;
+       private final FoldFunction<T, ACC> foldFunction;
+
+       /**
+        * Creates a new {@code FoldingState} that wraps the given {@link 
ValueState}. The
+        * {@code ValueState} must have the initial value of the fold as 
default value.
+        *
+        * @param wrappedState The wrapped {@code ValueState}
+        * @param foldFunction The {@code FoldFunction} to use for folding 
values into the state
+        */
+       @SuppressWarnings("unchecked")
+       public GenericFoldingState(ValueState<ACC> wrappedState, 
FoldFunction<T, ACC> foldFunction) {
+               if (!(wrappedState instanceof KvState)) {
+                       throw new IllegalArgumentException("Wrapped state must 
be a KvState.");
+               }
+               this.wrappedState = (W) wrappedState;
+               this.foldFunction = foldFunction;
+       }
+
+       @Override
+       public void setCurrentKey(K key) {
+               wrappedState.setCurrentKey(key);
+       }
+
+       @Override
+       public void setCurrentNamespace(N namespace) {
+               wrappedState.setCurrentNamespace(namespace);
+       }
+
+       @Override
+       public KvStateSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> snapshot(
+               long checkpointId,
+               long timestamp) throws Exception {
+               KvStateSnapshot<K, N, ValueState<ACC>, 
ValueStateDescriptor<ACC>, Backend> wrappedSnapshot = wrappedState.snapshot(
+                       checkpointId,
+                       timestamp);
+               return new Snapshot<>(wrappedSnapshot, foldFunction);
+       }
+
+       @Override
+       public void dispose() {
+               wrappedState.dispose();
+       }
+
+       @Override
+       public ACC get() throws Exception {
+               return wrappedState.value();
+       }
+
+       @Override
+       public void add(T value) throws Exception {
+               ACC currentValue = wrappedState.value();
+               wrappedState.update(foldFunction.fold(currentValue, value));
+       }
+
+       @Override
+       public void clear() {
+               wrappedState.clear();
+       }
+
+       private static class Snapshot<K, N, T, ACC, Backend extends 
AbstractStateBackend> implements KvStateSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> {
+               private static final long serialVersionUID = 1L;
+
+               private final KvStateSnapshot<K, N, ValueState<ACC>, 
ValueStateDescriptor<ACC>, Backend> wrappedSnapshot;
+
+               private final FoldFunction<T, ACC> foldFunction;
+
+               public Snapshot(KvStateSnapshot<K, N, ValueState<ACC>, 
ValueStateDescriptor<ACC>, Backend> wrappedSnapshot,
+                       FoldFunction<T, ACC> foldFunction) {
+                       this.wrappedSnapshot = wrappedSnapshot;
+                       this.foldFunction = foldFunction;
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> restoreState(
+                       Backend stateBackend,
+                       TypeSerializer<K> keySerializer,
+                       ClassLoader classLoader,
+                       long recoveryTimestamp) throws Exception {
+                       return new GenericFoldingState((ValueState<ACC>) 
wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, 
recoveryTimestamp), foldFunction);
+               }
+
+               @Override
+               public void discardState() throws Exception {
+                       wrappedSnapshot.discardState();
+               }
+
+               @Override
+               public long getStateSize() throws Exception {
+                       return wrappedSnapshot.getStateSize();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
index c20962f..fbb0170 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
@@ -40,6 +40,12 @@ public class GenericListState<K, N, T, Backend extends 
AbstractStateBackend, W e
 
        private final W wrappedState;
 
+       /**
+        * Creates a new {@code ListState} that wraps the given {@link 
ValueState}. The
+        * {@code ValueState} must have a default value of {@code null}.
+        *
+        * @param wrappedState The wrapped {@code ValueState}
+        */
        @SuppressWarnings("unchecked")
        public GenericListState(ValueState<ArrayList<T>> wrappedState) {
                if (!(wrappedState instanceof KvState)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
index 1181c66..102e25e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
@@ -39,6 +39,13 @@ public class GenericReducingState<K, N, T, Backend extends 
AbstractStateBackend,
        private final W wrappedState;
        private final ReduceFunction<T> reduceFunction;
 
+       /**
+        * Creates a new {@code ReducingState} that wraps the given {@link 
ValueState}. The
+        * {@code ValueState} must have a default value of {@code null}.
+        *
+        * @param wrappedState The wrapped {@code ValueState}
+        * @param reduceFunction The {@code ReduceFunction} to use for 
combining values.
+        */
        @SuppressWarnings("unchecked")
        public GenericReducingState(ValueState<T> wrappedState, 
ReduceFunction<T> reduceFunction) {
                if (!(wrappedState instanceof KvState)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
new file mode 100644
index 0000000..bba6df5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link FoldingState} that is
+ * snapshotted into files.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ */
+public class FsFoldingState<K, N, T, ACC>
+       extends AbstractFsState<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>>
+       implements FoldingState<T, ACC> {
+
+       private final FoldFunction<T, ACC> foldFunction;
+
+       /**
+        * Creates a new and empty partitioned state.
+        *
+        * @param backend The file system state backend backing snapshots of 
this state
+        * @param keySerializer The serializer for the key.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        */
+       public FsFoldingState(FsStateBackend backend,
+               TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc) {
+               super(backend, keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc);
+               this.foldFunction = stateDesc.getFoldFunction();
+       }
+
+       /**
+        * Creates a new key/value state with the given state contents.
+        * This method is used to re-create key/value state with existing data, 
for example from
+        * a snapshot.
+        *
+        * @param backend The file system state backend backing snapshots of 
this state
+        * @param keySerializer The serializer for the key.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+*                           and can create a default state value.
+        * @param state The map of key/value pairs to initialize the state with.
+        */
+       public FsFoldingState(FsStateBackend backend,
+               TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc,
+               HashMap<N, Map<K, ACC>> state) {
+               super(backend, keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc, state);
+               this.foldFunction = stateDesc.getFoldFunction();
+       }
+
+       @Override
+       public ACC get() {
+               if (currentNSState == null) {
+                       currentNSState = state.get(currentNamespace);
+               }
+               if (currentNSState != null) {
+                       ACC value = currentNSState.get(currentKey);
+                       return value != null ? value : 
stateDesc.getDefaultValue();
+               }
+               return stateDesc.getDefaultValue();
+       }
+
+       @Override
+       public void add(T value) throws IOException {
+               if (currentKey == null) {
+                       throw new RuntimeException("No key available.");
+               }
+
+               if (currentNSState == null) {
+                       currentNSState = new HashMap<>();
+                       state.put(currentNamespace, currentNSState);
+               }
+
+               ACC currentValue = currentNSState.get(currentKey);
+               try {
+                       if (currentValue == null) {
+                               currentNSState.put(currentKey, 
foldFunction.fold(stateDesc.getDefaultValue(), value));
+                       } else {
+                               currentNSState.put(currentKey, 
foldFunction.fold(currentValue, value));
+
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not add value to 
folding state.", e);
+               }
+       }
+
+       @Override
+       public KvStateSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, FsStateBackend> createHeapSnapshot(Path 
filePath) {
+               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, filePath);
+       }
+
+
+       public static class Snapshot<K, N, T, ACC> extends 
AbstractFsStateSnapshot<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>> {
+               private static final long serialVersionUID = 1L;
+
+               public Snapshot(TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       TypeSerializer<ACC> stateSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDescs,
+                       Path filePath) {
+                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, filePath);
+               }
+
+               @Override
+               public KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, FsStateBackend> createFsState(FsStateBackend 
backend, HashMap<N, Map<K, ACC>> stateMap) {
+                       return new FsFoldingState<>(backend, keySerializer, 
namespaceSerializer, stateDesc, stateMap);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 37c1392..77d540b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -287,6 +289,11 @@ public class FsStateBackend extends AbstractStateBackend {
                return new FsReducingState<>(this, keySerializer, 
namespaceSerializer, stateDesc);
        }
 
+       @Override
+       protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+               return new FsFoldingState<>(this, keySerializer, 
namespaceSerializer, stateDesc);
+       }
 
        @Override
        public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
new file mode 100644
index 0000000..07b677b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link FoldingState} that is
+ * snapshotted into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ */
+public class MemFoldingState<K, N, T, ACC>
+       extends AbstractMemState<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>>
+       implements FoldingState<T, ACC> {
+
+       private final FoldFunction<T, ACC> foldFunction;
+
+       public MemFoldingState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc) {
+               super(keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc);
+               this.foldFunction = stateDesc.getFoldFunction();
+       }
+
+       public MemFoldingState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc,
+               HashMap<N, Map<K, ACC>> state) {
+               super(keySerializer, namespaceSerializer, 
stateDesc.getSerializer(), stateDesc, state);
+               this.foldFunction = stateDesc.getFoldFunction();
+       }
+
+       @Override
+       public ACC get() {
+               if (currentNSState == null) {
+                       currentNSState = state.get(currentNamespace);
+               }
+               if (currentNSState != null) {
+                       ACC value = currentNSState.get(currentKey);
+                       return value != null ? value : 
stateDesc.getDefaultValue();
+               }
+               return stateDesc.getDefaultValue();
+       }
+
+       @Override
+       public void add(T value) throws IOException {
+               if (currentKey == null) {
+                       throw new RuntimeException("No key available.");
+               }
+
+               if (currentNSState == null) {
+                       currentNSState = new HashMap<>();
+                       state.put(currentNamespace, currentNSState);
+               }
+
+               ACC currentValue = currentNSState.get(currentKey);
+               try {
+                       if (currentValue == null) {
+                               currentNSState.put(currentKey, 
foldFunction.fold(stateDesc.getDefaultValue(), value));
+                       } else {
+                                       currentNSState.put(currentKey, 
foldFunction.fold(currentValue, value));
+
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not add value to 
folding state.", e);
+               }
+       }
+
+       @Override
+       public KvStateSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createHeapSnapshot(byte[] 
bytes) {
+               return new Snapshot<>(getKeySerializer(), 
getNamespaceSerializer(), stateSerializer, stateDesc, bytes);
+       }
+
+       public static class Snapshot<K, N, T, ACC> extends 
AbstractMemStateSnapshot<K, N, ACC, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>> {
+               private static final long serialVersionUID = 1L;
+
+               public Snapshot(TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       TypeSerializer<ACC> stateSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDescs, byte[] data) 
{
+                       super(keySerializer, namespaceSerializer, 
stateSerializer, stateDescs, data);
+               }
+
+               @Override
+               public KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createMemState(HashMap<N, 
Map<K, ACC>> stateMap) {
+                       return new MemFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc, stateMap);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 2b7b5f1..7b9d21b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -97,6 +99,11 @@ public class MemoryStateBackend extends AbstractStateBackend 
{
                return new MemReducingState<>(keySerializer, 
namespaceSerializer, stateDesc);
        }
 
+       @Override
+       public <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+               return new MemFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc);
+       }
+
        /**
         * Serialized the given state into bytes using Java serialization and 
creates a state handle that
         * can re-create that state.

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 6083bd6..27dee6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -22,7 +22,10 @@ import com.google.common.base.Joiner;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -410,6 +413,108 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                }
        }
 
+       @Test
+       @SuppressWarnings("unchecked,rawtypes")
+       public void testFoldingState() {
+               try {
+                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
+
+                       FoldingStateDescriptor<Integer, String> kvId = new 
FoldingStateDescriptor<>("id",
+                               "Fold-Initial:",
+                               new FoldFunction<Integer, String>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public String fold(String acc, Integer 
value) throws Exception {
+                                               return acc + "," + value;
+                                       }
+                               },
+                               String.class);
+                       FoldingState<Integer, String> state = 
backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+
+                       @SuppressWarnings("unchecked")
+                       KvState<Integer, Void, FoldingState<Integer, String>, 
FoldingStateDescriptor<Integer, String>, B> kv =
+                               (KvState<Integer, Void, FoldingState<Integer, 
String>, FoldingStateDescriptor<Integer, String>, B>) state;
+
+                       Joiner joiner = Joiner.on(",");
+                       // some modifications to the state
+                       kv.setCurrentKey(1);
+                       assertEquals("Fold-Initial:", state.get());
+                       state.add(1);
+                       kv.setCurrentKey(2);
+                       assertEquals("Fold-Initial:", state.get());
+                       state.add(2);
+                       kv.setCurrentKey(1);
+                       assertEquals("Fold-Initial:,1", state.get());
+
+                       // draw a snapshot
+                       KvStateSnapshot<Integer, Void, FoldingState<Integer, 
String>, FoldingStateDescriptor<Integer, String>, B> snapshot1 =
+                               kv.snapshot(682375462378L, 2);
+
+                       // make some more modifications
+                       kv.setCurrentKey(1);
+                       state.clear();
+                       state.add(101);
+                       kv.setCurrentKey(2);
+                       state.add(102);
+                       kv.setCurrentKey(3);
+                       state.add(103);
+
+                       // draw another snapshot
+                       KvStateSnapshot<Integer, Void, FoldingState<Integer, 
String>, FoldingStateDescriptor<Integer, String>, B> snapshot2 =
+                               kv.snapshot(682375462379L, 4);
+
+                       // validate the original state
+                       kv.setCurrentKey(1);
+                       assertEquals("Fold-Initial:,101", state.get());
+                       kv.setCurrentKey(2);
+                       assertEquals("Fold-Initial:,2,102", state.get());
+                       kv.setCurrentKey(3);
+                       assertEquals("Fold-Initial:,103", state.get());
+
+                       kv.dispose();
+
+                       // restore the first snapshot and validate it
+                       KvState<Integer, Void, FoldingState<Integer, String>, 
FoldingStateDescriptor<Integer, String>, B> restored1 = snapshot1.restoreState(
+                               backend,
+                               IntSerializer.INSTANCE,
+                               this.getClass().getClassLoader(), 10);
+
+                       snapshot1.discardState();
+
+                       @SuppressWarnings("unchecked")
+                       FoldingState<Integer, String> restored1State = 
(FoldingState<Integer, String>) restored1;
+
+                       restored1.setCurrentKey(1);
+                       assertEquals("Fold-Initial:,1", restored1State.get());
+                       restored1.setCurrentKey(2);
+                       assertEquals("Fold-Initial:,2", restored1State.get());
+
+                       restored1.dispose();
+
+                       // restore the second snapshot and validate it
+                       KvState<Integer, Void, FoldingState<Integer, String>, 
FoldingStateDescriptor<Integer, String>, B> restored2 = snapshot2.restoreState(
+                               backend,
+                               IntSerializer.INSTANCE,
+                               this.getClass().getClassLoader(), 20);
+
+                       snapshot2.discardState();
+
+                       @SuppressWarnings("unchecked")
+                       FoldingState<Integer, String> restored2State = 
(FoldingState<Integer, String>) restored2;
+
+                       restored2.setCurrentKey(1);
+                       assertEquals("Fold-Initial:,101", restored2State.get());
+                       restored2.setCurrentKey(2);
+                       assertEquals("Fold-Initial:,2,102", 
restored2State.get());
+                       restored2.setCurrentKey(3);
+                       assertEquals("Fold-Initial:,103", restored2State.get());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
 
        @Test
        public void testValueStateRestoreWithWrongSerializers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 2902795..d8da998 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -33,8 +33,9 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import 
org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -144,7 +145,7 @@ public class AllWindowedStream<T, W extends Window> {
                function = input.getExecutionEnvironment().clean(function);
 
                String callLocation = Utils.getCallLocationName();
-               String udfName = "Reduce at " + callLocation;
+               String udfName = "AllWindowedStream." + callLocation;
 
                SingleOutputStreamOperator<T, ?> result = 
createFastTimeOperatorIfValid(function, input.getType(), udfName);
                if (result != null) {
@@ -185,13 +186,15 @@ public class AllWindowedStream<T, W extends Window> {
         * @return The data stream that is the result of applying the fold 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, 
FoldFunction<T, R> function) {
-               //clean the closure
-               function = input.getExecutionEnvironment().clean(function);
+               if (function instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction. " +
+                               "Please use apply(FoldFunction, WindowFunction) 
instead.");
+               }
 
                TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(function, input.getType(),
-                               Utils.getCallLocationName(), true);
+                       Utils.getCallLocationName(), true);
 
-               return apply(new FoldAllWindowFunction<W, T, R>(initialValue, 
function), resultType);
+               return fold(initialValue, function, resultType);
        }
 
        /**
@@ -203,9 +206,12 @@ public class AllWindowedStream<T, W extends Window> {
         * @return The data stream that is the result of applying the fold 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, 
FoldFunction<T, R> function, TypeInformation<R> resultType) {
-               //clean the closure
-               function = input.getExecutionEnvironment().clean(function);
-               return apply(new FoldAllWindowFunction<W, T, R>(initialValue, 
function), resultType);
+               if (function instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction. " +
+                               "Please use apply(FoldFunction, WindowFunction) 
instead.");
+               }
+
+               return apply(initialValue, function, new 
PassThroughAllWindowFunction<W, R>(), resultType);
        }
 
        /**
@@ -244,7 +250,7 @@ public class AllWindowedStream<T, W extends Window> {
                function = input.getExecutionEnvironment().clean(function);
 
                String callLocation = Utils.getCallLocationName();
-               String udfName = "WindowApply at " + callLocation;
+               String udfName = "AllWindowedStream." + callLocation;
 
                SingleOutputStreamOperator<R, ?> result = 
createFastTimeOperatorIfValid(function, resultType, udfName);
                if (result != null) {
@@ -321,7 +327,7 @@ public class AllWindowedStream<T, W extends Window> {
                preAggregator = 
input.getExecutionEnvironment().clean(preAggregator);
 
                String callLocation = Utils.getCallLocationName();
-               String udfName = "WindowApply at " + callLocation;
+               String udfName = "AllWindowedStream." + callLocation;
 
                String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
 
@@ -348,6 +354,81 @@ public class AllWindowedStream<T, W extends Window> {
                return input.transform(opName, resultType, 
operator).setParallelism(1);
        }
 
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, 
FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
+               TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+                       Utils.getCallLocationName(), true);
+
+               return apply(initialValue, foldFunction, function, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, 
FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, 
TypeInformation<R> resultType) {
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of apply can not be a RichFunction.");
+               }
+
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               foldFunction = 
input.getExecutionEnvironment().clean(foldFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "AllWindowedStream." + callLocation;
+
+               String opName;
+
+               OneInputStreamOperator<T, R> operator;
+
+               boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
+
+               if (evictor != null) {
+                       opName = "NonParallelTriggerWindow(" + windowAssigner  
+ ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               new HeapWindowBuffer.Factory<T>(),
+                               new FoldApplyAllWindowFunction<>(initialValue, 
foldFunction, function),
+                               trigger,
+                               
evictor).enableSetProcessingTime(setProcessingTime);
+
+               } else {
+                       opName = "NonParallelTriggerWindow(" + windowAssigner  
+ ", " + trigger + ", " + udfName + ")";
+
+                       operator = new NonKeyedWindowOperator<>(windowAssigner,
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               new HeapWindowBuffer.Factory<T>(),
+                               new FoldApplyAllWindowFunction<>(initialValue, 
foldFunction, function),
+                               
trigger).enableSetProcessingTime(setProcessingTime);
+               }
+
+               return input.transform(opName, resultType, 
operator).setParallelism(1);
+       }
+
        // 
------------------------------------------------------------------------
        //  Aggregations on the  windows
        // 
------------------------------------------------------------------------

Reply via email to