[FLINK-5821] [state backends] Rename the 'StateBackend' to 'StateBinder' and 
create root StateBackend interface

StateBinder more correctly reflects what the interface does and clears up the 
name 'StateBackend'

The 'StateBackend' interface is now the root of the State Backend hierarchy 
(previously that was 'AbstractStateBackend')

This also extends a lot the JavaDocs of the core state classes, like 
StateBackend and StateObject


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f15603d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f15603d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f15603d8

Branch: refs/heads/master
Commit: f15603d81dad4861175093f4ad22eb2f8ccee4a0
Parents: a404796
Author: Stephan Ewen <[email protected]>
Authored: Mon Feb 13 14:29:37 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Feb 21 01:16:26 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |   3 +-
 .../state/AggregatingStateDescriptor.java       |   4 +-
 .../common/state/FoldingStateDescriptor.java    |   4 +-
 .../api/common/state/ListStateDescriptor.java   |   4 +-
 .../common/state/ReducingStateDescriptor.java   |   4 +-
 .../flink/api/common/state/StateBackend.java    |  73 ----------
 .../flink/api/common/state/StateBinder.java     |  73 ++++++++++
 .../flink/api/common/state/StateDescriptor.java |   8 +-
 .../api/common/state/ValueStateDescriptor.java  |   4 +-
 .../api/common/state/ListStateDescriptor.java   |   4 +-
 .../state/AbstractKeyedStateBackend.java        |   4 +-
 .../runtime/state/AbstractStateBackend.java     |  31 ++--
 .../flink/runtime/state/StateBackend.java       | 145 +++++++++++++++++++
 .../apache/flink/runtime/state/StateObject.java |  41 ++++--
 14 files changed, 276 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 06cceda..6b09a8a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -19,7 +19,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
@@ -50,7 +49,7 @@ import java.util.UUID;
 import static java.util.Objects.requireNonNull;
 
 /**
- * A {@link StateBackend} that stores its state in {@code RocksDB}. This state 
backend can
+ * A State Backend that stores its state in {@code RocksDB}. This state 
backend can
  * store very large state that exceeds memory and spills to disk.
  *
  * <p>All key/value state (including windows) is stored in the key/value index 
of RocksDB.

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index abdac91..b7378d6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -96,8 +96,8 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends 
StateDescriptor<Ag
        // 
------------------------------------------------------------------------
 
        @Override
-       public AggregatingState<IN, OUT> bind(StateBackend stateBackend) throws 
Exception {
-               return stateBackend.createAggregatingState(this);
+       public AggregatingState<IN, OUT> bind(StateBinder stateBinder) throws 
Exception {
+               return stateBinder.createAggregatingState(this);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 143945e..73bfaa8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -97,8 +97,8 @@ public class FoldingStateDescriptor<T, ACC> extends 
StateDescriptor<FoldingState
        // 
------------------------------------------------------------------------
        
        @Override
-       public FoldingState<T, ACC> bind(StateBackend stateBackend) throws 
Exception {
-               return stateBackend.createFoldingState(this);
+       public FoldingState<T, ACC> bind(StateBinder stateBinder) throws 
Exception {
+               return stateBinder.createFoldingState(this);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index c03f8cb..ea28ad2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -79,8 +79,8 @@ public class ListStateDescriptor<T> extends 
StateDescriptor<ListState<T>, List<T
        // 
------------------------------------------------------------------------
 
        @Override
-       public ListState<T> bind(StateBackend stateBackend) throws Exception {
-               return stateBackend.createListState(this);
+       public ListState<T> bind(StateBinder stateBinder) throws Exception {
+               return stateBinder.createListState(this);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index a1d4225..3edf1ca 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -86,8 +86,8 @@ public class ReducingStateDescriptor<T> extends 
StateDescriptor<ReducingState<T>
        // 
------------------------------------------------------------------------
        
        @Override
-       public ReducingState<T> bind(StateBackend stateBackend) throws 
Exception {
-               return stateBackend.createReducingState(this);
+       public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
+               return stateBinder.createReducingState(this);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
deleted file mode 100644
index f9d1af7..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * The {@code StateBackend} is used by {@link StateDescriptor} instances to 
create actual state
- * representations.
- */
-@PublicEvolving
-public interface StateBackend {
-
-       /**
-        * Creates and returns a new {@link ValueState}.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <T> The type of the value that the {@code ValueState} can 
store.
-        */
-       <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) 
throws Exception;
-
-       /**
-        * Creates and returns a new {@link ListState}.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <T> The type of the values that the {@code ListState} can 
store.
-        */
-       <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) 
throws Exception;
-
-       /**
-        * Creates and returns a new {@link ReducingState}.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <T> The type of the values that the {@code ReducingState} can 
store.
-        */
-       <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> 
stateDesc) throws Exception;
-
-       /**
-        * Creates and returns a new {@link AggregatingState}.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <IN> The type of the values that go into the aggregating state
-        * @param <ACC> The type of the values that are stored in the 
aggregating state   
-        * @param <OUT> The type of the values that come out of the aggregating 
state   
-        */
-       <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
-                       AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) 
throws Exception;
-
-       /**
-        * Creates and returns a new {@link FoldingState}.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <T> Type of the values folded into the state
-        * @param <ACC> Type of the value in the state
-        */
-       <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
new file mode 100644
index 0000000..08dfc90
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The {@code StateBinder} is used by {@link StateDescriptor} instances to 
create actual
+ * {@link State} objects.
+ */
+@Internal
+public interface StateBinder {
+
+       /**
+        * Creates and returns a new {@link ValueState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <T> The type of the value that the {@code ValueState} can 
store.
+        */
+       <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) 
throws Exception;
+
+       /**
+        * Creates and returns a new {@link ListState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <T> The type of the values that the {@code ListState} can 
store.
+        */
+       <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) 
throws Exception;
+
+       /**
+        * Creates and returns a new {@link ReducingState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <T> The type of the values that the {@code ReducingState} can 
store.
+        */
+       <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> 
stateDesc) throws Exception;
+
+       /**
+        * Creates and returns a new {@link AggregatingState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <IN> The type of the values that go into the aggregating state
+        * @param <ACC> The type of the values that are stored in the 
aggregating state   
+        * @param <OUT> The type of the values that come out of the aggregating 
state   
+        */
+       <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
+                       AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) 
throws Exception;
+
+       /**
+        * Creates and returns a new {@link FoldingState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <T> Type of the values folded into the state
+        * @param <ACC> Type of the value in the state
+        */
+       <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index bc909e6..332e649 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -39,7 +39,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
  * {@link State} in stateful operations. This contains the name and can create 
an actual state
- * object given a {@link StateBackend} using {@link #bind(StateBackend)}.
+ * object given a {@link StateBinder} using {@link #bind(StateBinder)}.
  *
  * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link 
#hashCode()}.
  *
@@ -208,11 +208,11 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
        }
 
        /**
-        * Creates a new {@link State} on the given {@link StateBackend}.
+        * Creates a new {@link State} on the given {@link StateBinder}.
         *
-        * @param stateBackend The {@code StateBackend} on which to create the 
{@link State}.
+        * @param stateBinder The {@code StateBackend} on which to create the 
{@link State}.
         */
-       public abstract S bind(StateBackend stateBackend) throws Exception;
+       public abstract S bind(StateBinder stateBinder) throws Exception;
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index b3006c4..3afc8a7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -124,8 +124,8 @@ public class ValueStateDescriptor<T> extends 
StateDescriptor<ValueState<T>, T> {
        // 
------------------------------------------------------------------------
        
        @Override
-       public ValueState<T> bind(StateBackend stateBackend) throws Exception {
-               return stateBackend.createValueState(this);
+       public ValueState<T> bind(StateBinder stateBinder) throws Exception {
+               return stateBinder.createValueState(this);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
index 28bc812..3b1af54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -20,7 +20,7 @@ package org.apache.flink.migration.api.common.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateBinder;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -70,7 +70,7 @@ public class ListStateDescriptor<T> extends 
StateDescriptor<ListState<T>, T> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public ListState<T> bind(StateBackend stateBackend) throws Exception {
+       public ListState<T> bind(StateBinder stateBinder) throws Exception {
                throw new IllegalStateException("Cannot bind states with a 
legacy state descriptor.");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index c8e0d0d..fe5d1cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateBinder;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -264,7 +264,7 @@ public abstract class AbstractKeyedStateBackend<K>
                }
 
                // create a new blank key/value state
-               S state = stateDescriptor.bind(new StateBackend() {
+               S state = stateDescriptor.bind(new StateBinder() {
                        @Override
                        public <T> ValueState<T> 
createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
                                return 
AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 60d035a..bc4594a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
@@ -26,27 +27,18 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import java.io.IOException;
 
 /**
- * A state backend defines how state is stored and snapshotted during 
checkpoints.
+ * An abstract base implementation of the {@link StateBackend} interface.
  */
-public abstract class AbstractStateBackend implements java.io.Serializable {
+@PublicEvolving
+public abstract class AbstractStateBackend implements StateBackend, 
java.io.Serializable {
        private static final long serialVersionUID = 4620415814639230247L;
 
-       /**
-        * Creates a {@link CheckpointStreamFactory} that can be used to create 
streams
-        * that should end up in a checkpoint.
-        *
-        * @param jobId              The {@link JobID} of the job for which we 
are creating checkpoint streams.
-        * @param operatorIdentifier An identifier of the operator for which we 
create streams.
-        */
+       @Override
        public abstract CheckpointStreamFactory createStreamFactory(
                        JobID jobId,
-                       String operatorIdentifier
-       ) throws IOException;
+                       String operatorIdentifier) throws IOException;
 
-       /**
-        * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for keeping keyed state
-        * and can be checkpointed to checkpoint streams.
-        */
+       @Override
        public abstract <K> AbstractKeyedStateBackend<K> 
createKeyedStateBackend(
                        Environment env,
                        JobID jobID,
@@ -54,16 +46,13 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry
-       ) throws Exception;
+                       TaskKvStateRegistry kvStateRegistry) throws Exception;
 
-       /**
-        * Creates a new {@link OperatorStateBackend} that can be used for 
storing partitionable operator
-        * state in checkpoint streams.
-        */
+       @Override
        public OperatorStateBackend createOperatorStateBackend(
                        Environment env,
                        String operatorIdentifier) throws Exception {
+
                return new 
DefaultOperatorStateBackend(env.getUserClassLoader());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
new file mode 100644
index 0000000..846df89
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+
+import java.io.IOException;
+
+/**
+ * A <b>State Backend</b> defines how the state of a streaming application is 
stored and
+ * checkpointed. Different State Backends store their state in different 
fashions, and use
+ * different data structures to hold the state of a running application.
+ *
+ * <p>For example, the {@link 
org.apache.flink.runtime.state.memory.MemoryStateBackend memory state backend}
+ * keeps working state in the memory of the TaskManager and stores checkpoints 
in the memory of the
+ * JobManager. The backend is lightweight and without additional dependencies, 
but not highly available
+ * and supports only small state.
+ *
+ * <p>The {@link org.apache.flink.runtime.state.filesystem.FsStateBackend file 
system state backend}
+ * keeps working state in the memory of the TaskManager and stores state 
checkpoints in a filesystem
+ * (typically a replicated highly-available filesystem, like <a 
href="https://hadoop.apache.org/";>HDFS</a>,
+ * <a href="https://ceph.com/";>Ceph</a>, <a 
href="https://aws.amazon.com/documentation/s3/";>S3</a>,
+ * <a href="https://cloud.google.com/storage/";>GCS</a>, etc).
+ * 
+ * <p>The {@code RocksDBStateBackend} stores working state in <a 
href="http://rocksdb.org/";>RocksDB</a>,
+ * and checkpoints the state by default to a filesystem (similar to the {@code 
FsStateBackend}).
+ * 
+ * <h2>Raw Bytes Storage and Backends</h2>
+ * 
+ * The {@code StateBackend} creates services for <i>raw bytes storage</i> and 
for <i>keyed state</i>
+ * and <i>operator state</i>.
+ * 
+ * <p>The <i>raw bytes storage</i> (through the {@link 
CheckpointStreamFactory}) is the fundamental
+ * service that simply stores bytes in a fault tolerant fashion. This service 
is used by the JobManager
+ * to store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator state
+ * backends to store checkpointed state.
+ *
+ * <p>The {@link AbstractKeyedStateBackend} and {@link OperatorStateBackend} 
created by this state
+ * backend define how to hold the working state for keys and operators. They 
also define how to checkpoint
+ * that state, frequently using the raw bytes storage (via the {@code 
CheckpointStreamFactory}).
+ * However, it is also possible that for example a keyed state backend simply 
implements the bridge to
+ * a key/value store, and that it does not need to store anything in the raw 
byte storage upon a
+ * checkpoint.
+ * 
+ * <h2>Serializability</h2>
+ * 
+ * State Backends need to be {@link java.io.Serializable serializable}, 
because they distributed
+ * across parallel processes (for distributed execution) together with the 
streaming application code. 
+ * 
+ * <p>Because of that, {@code StateBackend} implementations (typically 
subclasses
+ * of {@link AbstractStateBackend}) are meant to be like <i>factories</i> that 
create the proper
+ * states stores that provide access to the persistent storage and hold the 
keyed- and operator
+ * state data structures. That way, the State Backend can be very lightweight 
(contain only
+ * configurations) which makes it easier to be serializable.
+ * 
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * State backend implementations have to be thread-safe. Multiple threads may 
be creating
+ * streams and keyed-/operator state backends concurrently.
+ */
+@PublicEvolving
+public interface StateBackend extends java.io.Serializable {
+
+       // 
------------------------------------------------------------------------
+       //  Persistent Bytes Storage
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a {@link CheckpointStreamFactory} that can be used to create 
streams
+        * that should end up in a checkpoint.
+        *
+        * @param jobId              The {@link JobID} of the job for which we 
are creating checkpoint streams.
+        * @param operatorIdentifier An identifier of the operator for which we 
create streams.
+        */
+       CheckpointStreamFactory createStreamFactory(JobID jobId, String 
operatorIdentifier) throws IOException;
+
+       // 
------------------------------------------------------------------------
+       //  Structure Backends 
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for holding <b>keyed state</b>
+        * and checkpointing it.
+        * 
+        * <p><i>Keyed State</i> is state where each value is bound to a key.
+        * 
+        * @param env
+        * @param jobID
+        * @param operatorIdentifier
+        * @param keySerializer
+        * @param numberOfKeyGroups
+        * @param keyGroupRange
+        * @param kvStateRegistry
+        * 
+        * @param <K> The type of the keys by which the state is organized.
+        *     
+        * @return The Keyed State Backend for the given job, operator, and key 
group range.
+        * 
+        * @throws Exception This method may forward all exceptions that occur 
while instantiating the backend.
+        */
+       <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+                       Environment env,
+                       JobID jobID,
+                       String operatorIdentifier,
+                       TypeSerializer<K> keySerializer,
+                       int numberOfKeyGroups,
+                       KeyGroupRange keyGroupRange,
+                       TaskKvStateRegistry kvStateRegistry) throws Exception;
+
+       /**
+        * Creates a new {@link OperatorStateBackend} that can be used for 
storing operator state.
+        * 
+        * <p>Operator state is state that is associated with parallel operator 
(or function) instances,
+        * rather than with keys.
+        * 
+        * @param env The runtime environment of the executing task.
+        * @param operatorIdentifier The identifier of the operator whose state 
should be stored.
+        * 
+        * @return The OperatorStateBackend for operator identified by the job 
and operator identifier.
+        * 
+        * @throws Exception This method may forward all exceptions that occur 
while instantiating the backend.
+        */
+       OperatorStateBackend createOperatorStateBackend(Environment env, String 
operatorIdentifier) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index 9ff2fa8..7f1dd18 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -19,28 +19,45 @@
 package org.apache.flink.runtime.state;
 
 /**
- * Base of all types that represent checkpointed state. Specializations are for
- * example {@link StateHandle StateHandles} (directly resolve to state).
+ * Base of all handles that represent checkpointed state in some form. The 
object may hold
+ * the (small) state directly, or contain a file path (state is in the file), 
or contain the
+ * metadata to access the state stored in some external database.
  *
- * <p>State objects define how to:
- * <ul>
- *     <li><b>Discard State</b>: The {@link #discardState()} method defines 
how state is permanently
- *         disposed/deleted. After that method call, state may not be 
recoverable any more.</li>
- * </ul>
+ * <p>State objects define how to {@link #discardState() discard state} and 
how to access the
+ * {@link #getStateSize() size of the state}.
+ * 
+ * <p>State Objects are transported via RPC between <i>JobManager</i> and
+ * <i>TaskManager</i> and must be {@link java.io.Serializable serializable} to 
support that.
+ * 
+ * <p>Some State Objects are stored in the checkpoint/savepoint metadata. For 
long-term
+ * compatibility, they are not stored via {@link java.io.Serializable Java 
Serialization},
+ * but through custom serializers.
  */
 public interface StateObject extends java.io.Serializable {
 
        /**
         * Discards the state referred to by this handle, to free up resources 
in
-        * the persistent storage. This method is called when the handle will 
not be
-        * used any more.
+        * the persistent storage. This method is called when the state 
represented by this
+        * object will not be used any more.
         */
        void discardState() throws Exception;
 
        /**
-        * Returns the size of the state in bytes.
-        *
-        * <p>If the the size is not known, return {@code 0}.
+        * Returns the size of the state in bytes. If the the size is not 
known, this
+        * method should return {@code 0}.
+        * 
+        * <p>The values produced by this method are only used for 
informational purposes and
+        * for metrics/monitoring. If this method returns wrong values, the 
checkpoints and recovery
+        * will still behave correctly. However, efficiency may be impacted 
(wrong space pre-allocation)
+        * and functionality that depends on metrics (like monitoring) will be 
impacted.
+        * 
+        * <p>Note for implementors: This method should not perform any I/O 
operations
+        * while obtaining the state size (hence it does not declare throwing 
an {@code IOException}).
+        * Instead, the state size should be stored in the state object, or 
should be computable from
+        * the state stored in this object.
+        * The reason is that this method is called frequently by several parts 
of the checkpointing
+        * and issuing I/O requests from this method accumulates a heavy I/O 
load on the storage
+        * system at higher scale.
         *
         * @return Size of the state in bytes.
         */

Reply via email to