[FLINK-5991] [DataStream] Expose Union ListState for operator state

This commit exposes the union list state scheme for managed operator state.
The actual functionality was already previously added to the
`DefaultOperatorStateBackend`, so this change simply exposes the feature
through the `OperatorStateStore` interface.

This commit also updates the documentation for managed operator state so
that it covers the new union list state scheme. It strengthens the
difference between keyed and non-keyed state data structures by
emphasizing the semantic differences in the state access method Javadocs.

This closes #3508.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ef4900a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ef4900a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ef4900a

Branch: refs/heads/master
Commit: 2ef4900aa279e75844a9f8536cfe007c2542187d
Parents: a1aab64
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Wed Apr 19 02:53:31 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Wed Apr 19 10:33:29 2017 +0800

----------------------------------------------------------------------
 docs/dev/stream/state.md                        | 109 ++++++++++++-------
 .../api/common/state/OperatorStateStore.java    |  36 +++++-
 .../state/DefaultOperatorStateBackend.java      |   8 +-
 .../runtime/state/OperatorStateBackendTest.java |  21 ++--
 .../test/checkpointing/RescalingITCase.java     |   9 +-
 5 files changed, 122 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 40522e1..276842d 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -64,16 +64,12 @@ for one or more Key Groups.
 
 With *Operator State* (or *non-keyed state*), each operator state is
 bound to one parallel operator instance.
-The Kafka source connector is a good motivating example for the use of 
Operator State
-in Flink. Each parallel instance of this Kafka consumer maintains a map
+The [Kafka Connector](../connectors/kafka.html) is a good motivating example 
for the use of Operator State
+in Flink. Each parallel instance of the Kafka consumer maintains a map
 of topic partitions and offsets as its Operator State.
 
 The Operator State interfaces support redistributing state among
-parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution; the following are currently 
defined:
-
-  - **List-style redistribution:** Each operator returns a List of state 
elements. The whole state is logically a concatenation of
-    all lists. On restore/redistribution, the list is evenly divided into as 
many sublists as there are parallel operators.
-    Each operator gets a sublist, which can be empty, or contain one or more 
elements.
+parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution.
 
 ## Raw and Managed State
 
@@ -233,32 +229,13 @@ val counts: DataStream[(String, Int)] = stream
 
 ## Using Managed Operator State
 
-A stateful function can implement either the more general 
`CheckpointedFunction`
+To use managed operator state, a stateful function can implement either the 
more general `CheckpointedFunction`
 interface, or the `ListCheckpointed<T extends Serializable>` interface.
 
-In both cases, the non-keyed state is expected to be a `List` of 
*serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these objects 
are the finest granularity at which
-non-keyed state can be repartitioned. As an example, if with parallelism 1 the 
checkpointed state of the `BufferingSink`
-contains elements `(test1, 2)` and `(test2, 2)`, when increasing the 
parallelism to 2, `(test1, 2)` may end up in task 0,
-while `(test2, 2)` will go to task 1.
-
-##### ListCheckpointed
+#### CheckpointedFunction
 
-The `ListCheckpointed` interface requires the implementation of two methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-On `snapshotState()` the operator should return a list of objects to 
checkpoint and
-`restoreState` has to handle such a list upon recovery. If the state is not 
re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
-
-##### CheckpointedFunction
-
-The `CheckpointedFunction` interface also requires the implementation of two 
methods:
+The `CheckpointedFunction` interface provides access to non-keyed state with 
different
+redistribution schemes. It requires the implementation of two methods:
 
 {% highlight java %}
 void snapshotState(FunctionSnapshotContext context) throws Exception;
@@ -266,12 +243,30 @@ void snapshotState(FunctionSnapshotContext context) 
throws Exception;
 void initializeState(FunctionInitializationContext context) throws Exception;
 {% endhighlight %}
 
-Whenever a checkpoint has to be performed `snapshotState()` is called. The 
counterpart, `initializeState()`, is called every time the user-defined 
function is initialized, be that when the function is first initialized
-or be that when actually recovering from an earlier checkpoint. Given this, 
`initializeState()` is not
+Whenever a checkpoint has to be performed, `snapshotState()` is called. The 
counterpart, `initializeState()`,
+is called every time the user-defined function is initialized, be that when 
the function is first initialized
+or be that when the function is actually recovering from an earlier 
checkpoint. Given this, `initializeState()` is not
 only the place where different types of state are initialized, but also where 
state recovery logic is included.
 
-This is an example of a function that uses `CheckpointedFunction`, a stateful 
`SinkFunction` that
-uses state to buffer elements before sending them to the outside world:
+Currently, list-style managed operator state is supported. The state
+is expected to be a `List` of *serializable* objects, independent from each 
other,
+thus eligible for redistribution upon rescaling. In other words, these objects 
are the finest granularity at which
+non-keyed state can be redistributed. Depending on the state accessing method,
+the following redistribution schemes are defined:
+
+  - **Even-split redistribution:** Each operator returns a List of state 
elements. The whole state is logically a concatenation of
+    all lists. On restore/redistribution, the list is evenly divided into as 
many sublists as there are parallel operators.
+    Each operator gets a sublist, which can be empty, or contain one or more 
elements.
+    As an example, if with parallelism 1 the checkpointed state of an operator
+    contains elements `element1` and `element2`, when increasing the 
parallelism to 2, `element1` may end up in operator instance 0,
+    while `element2` will go to operator instance 1.
+
+  - **Union redistribution:** Each operator returns a List of state elements. 
The whole state is logically a concatenation of
+    all lists. On restore/redistribution, each operator gets the complete list 
of state elements.
+
+Below is an example of a stateful `SinkFunction` that uses 
`CheckpointedFunction`
+to buffer elements before sending them to the outside world. It demonstrates
+the basic even-split redistribution list state:
 
 {% highlight java %}
 public class BufferingSink
@@ -311,8 +306,13 @@ public class BufferingSink
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        checkpointedState = context.getOperatorStateStore().
-            getSerializableListState("buffered-elements");
+        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+            new ListStateDescriptor<>(
+                "buffered-elements",
+                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
+                Tuple2.of(0L, 0L));
+                
+        checkpointedState = 
context.getOperatorStateStore().getListState(descriptor);
 
         if (context.isRestored()) {
             for (Tuple2<String, Integer> element : checkpointedState.get()) {
@@ -329,12 +329,29 @@ public class BufferingSink
 }
 {% endhighlight %}
 
-
 The `initializeState` method takes as argument a 
`FunctionInitializationContext`. This is used to initialize
 the non-keyed state "containers". These are a container of type `ListState` 
where the non-keyed state objects
 are going to be stored upon checkpointing.
 
-`this.checkpointedState = 
context.getOperatorStateStore().getSerializableListState("buffered-elements");`
+Note how the state is initialized, similar to keyed state,
+with a `StateDescriptor` that contains the state name and information
+about the type of the value that the state holds:
+
+{% highlight java %}
+ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+    new ListStateDescriptor<>(
+        "buffered-elements",
+        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
+        Tuple2.of(0L, 0L));
+
+checkpointedState = context.getOperatorStateStore().getListState(descriptor);
+{% endhighlight %}
+
+The naming convention of the state access methods contain its redistribution
+pattern followed by its state structure. For example, to use list state with 
the
+union redistribution scheme on restore, access the state by using 
`getUnionListState(descriptor)`.
+If the method name does not contain the redistribution pattern, *e.g.* 
`getListState(descriptor)`,
+it simply implies that the basic even-split redistribution scheme will be used.
 
 After initializing the container, we use the `isRestored()` method of the 
context to check if we are
 recovering after a failure. If this is `true`, *i.e.* we are recovering, the 
restore logic is applied.
@@ -346,6 +363,22 @@ of all objects included by the previous checkpoint, and is 
then filled with the
 As a side note, the keyed state can also be initialized in the 
`initializeState()` method. This can be done
 using the provided `FunctionInitializationContext`.
 
+#### ListCheckpointed
+
+The `ListCheckpointed` interface is a more limited variant of 
`CheckpointedFunction`,
+which only supports list-style state with even-split redistribution scheme on 
restore.
+It also requires the implementation of two methods:
+
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
+
+On `snapshotState()` the operator should return a list of objects to 
checkpoint and
+`restoreState` has to handle such a list upon recovery. If the state is not 
re-partitionable, you can always
+return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
+
 ### Stateful Source Functions
 
 Stateful sources require a bit more care as opposed to other operators.

http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index 8be04fc..bf22041 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -33,7 +33,16 @@ public interface OperatorStateStore {
         * Creates (or restores) a list state. Each state is registered under a 
unique name.
         * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
         *
-        * The items in the list are repartitionable by the system in case of 
changed operator parallelism.
+        * <p>Note the semantic differences between an operator list state and 
a keyed list state
+        * (see {@link KeyedStateStore#getListState(ListStateDescriptor)}). 
Under the context of operator state,
+        * the list is a collection of state items that are independent from 
each other and eligible for redistribution
+        * across operator instances in case of changed operator parallelism. 
In other words, these state items are
+        * the finest granularity at which non-keyed state can be 
redistributed, and should not be correlated with
+        * each other.
+        *
+        * <p>The redistribution scheme of this list state upon operator 
rescaling is a round-robin pattern, such that
+        * the logical whole state (a concatenation of all the lists of state 
elements previously managed by each operator
+        * before the restore) is evenly divided into as many sublists as there 
are parallel operators.
         *
         * @param stateDescriptor The descriptor for this state, providing a 
name and serializer.
         * @param <S> The generic type of the state
@@ -44,7 +53,32 @@ public interface OperatorStateStore {
        <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) 
throws Exception;
 
        /**
+        * Creates (or restores) a list state. Each state is registered under a 
unique name.
+        * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
+        *
+        * <p>Note the semantic differences between an operator list state and 
a keyed list state
+        * (see {@link KeyedStateStore#getListState(ListStateDescriptor)}). 
Under the context of operator state,
+        * the list is a collection of state items that are independent from 
each other and eligible for redistribution
+        * across operator instances in case of changed operator parallelism. 
In other words, these state items are
+        * the finest granularity at which non-keyed state can be 
redistributed, and should not be correlated with
+        * each other.
+        *
+        * <p>The redistribution scheme of this list state upon operator 
rescaling is a broadcast pattern, such that
+        * the logical whole state (a concatenation of all the lists of state 
elements previously managed by each operator
+        * before the restore) is restored to all parallel operators so that 
each of them will get the union of all state
+        * items before the restore.
+        *
+        * @param stateDescriptor The descriptor for this state, providing a 
name and serializer.
+        * @param <S> The generic type of the state
+        *
+        * @return A list for all state partitions.
+        * @throws Exception
+        */
+       <S> ListState<S> getUnionListState(ListStateDescriptor<S> 
stateDescriptor) throws Exception;
+
+       /**
         * Returns a set with the names of all currently registered states.
+        *
         * @return set of names for all registered states.
         */
        Set<String> getRegisteredStateNames();

http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 42d4519..eb3ba01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -95,12 +95,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                return getListState(stateDescriptor, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        }
 
-       @SuppressWarnings("unchecked")
-       public <T extends Serializable> ListState<T> 
getBroadcastSerializableListState(String stateName) throws Exception {
-               return (ListState<T>) getBroadcastOperatorState(new 
ListStateDescriptor<>(stateName, javaSerializer));
-       }
-
-       public <S> ListState<S> 
getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws 
Exception {
+       @Override
+       public <S> ListState<S> getUnionListState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
                return getListState(stateDescriptor, 
OperatorStateHandle.Mode.BROADCAST);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 5a072df..13a6307 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -55,7 +55,7 @@ public class OperatorStateBackendTest {
        public void testCreateOnAbstractStateBackend() throws Exception {
                // we use the memory state backend as a subclass of the 
AbstractStateBackend
                final AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend();
-               OperatorStateBackend operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(
+               final OperatorStateBackend operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(
                                createMockEnvironment(), "test-operator");
 
                assertNotNull(operatorStateBackend);
@@ -75,7 +75,7 @@ public class OperatorStateBackendTest {
                final ExecutionConfig cfg = new ExecutionConfig();
                cfg.registerTypeWithKryoSerializer(registeredType, 
com.esotericsoftware.kryo.serializers.JavaSerializer.class);
 
-               final DefaultOperatorStateBackend operatorStateBackend = new 
DefaultOperatorStateBackend(classLoader, cfg);
+               final OperatorStateBackend operatorStateBackend = new 
DefaultOperatorStateBackend(classLoader, cfg);
 
                ListStateDescriptor<File> stateDescriptor = new 
ListStateDescriptor<>("test", File.class);
                ListStateDescriptor<String> stateDescriptor2 = new 
ListStateDescriptor<>("test2", String.class);
@@ -107,7 +107,7 @@ public class OperatorStateBackendTest {
 
        @Test
        public void testRegisterStates() throws Exception {
-               final DefaultOperatorStateBackend operatorStateBackend =
+               final OperatorStateBackend operatorStateBackend =
                                new DefaultOperatorStateBackend(classLoader, 
new ExecutionConfig());
 
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
@@ -140,7 +140,7 @@ public class OperatorStateBackendTest {
                assertEquals(23, it.next());
                assertTrue(!it.hasNext());
 
-               ListState<Serializable> listState3 = 
operatorStateBackend.getBroadcastOperatorState(stateDescriptor3);
+               ListState<Serializable> listState3 = 
operatorStateBackend.getUnionListState(stateDescriptor3);
                assertNotNull(listState3);
                assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());
                assertTrue(!it.hasNext());
@@ -176,7 +176,7 @@ public class OperatorStateBackendTest {
                assertTrue(!it.hasNext());
 
                try {
-                       
operatorStateBackend.getBroadcastOperatorState(stateDescriptor2);
+                       
operatorStateBackend.getUnionListState(stateDescriptor2);
                        fail("Did not detect changed mode");
                } catch (IllegalStateException ignored) {
 
@@ -194,7 +194,7 @@ public class OperatorStateBackendTest {
        public void testSnapshotEmpty() throws Exception {
                final AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
 
-               final DefaultOperatorStateBackend operatorStateBackend = 
(DefaultOperatorStateBackend)
+               final OperatorStateBackend operatorStateBackend =
                                
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), 
"testOperator");
 
                CheckpointStreamFactory streamFactory =
@@ -211,7 +211,7 @@ public class OperatorStateBackendTest {
        public void testSnapshotRestore() throws Exception {
                AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
 
-               DefaultOperatorStateBackend operatorStateBackend = 
(DefaultOperatorStateBackend)
+               OperatorStateBackend operatorStateBackend =
                                
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), 
"test-op-name");
 
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
@@ -219,7 +219,7 @@ public class OperatorStateBackendTest {
                ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
                ListState<Serializable> listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
                ListState<Serializable> listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
-               ListState<Serializable> listState3 = 
operatorStateBackend.getBroadcastOperatorState(stateDescriptor3);
+               ListState<Serializable> listState3 = 
operatorStateBackend.getUnionListState(stateDescriptor3);
 
                listState1.add(42);
                listState1.add(4711);
@@ -242,8 +242,7 @@ public class OperatorStateBackendTest {
                        operatorStateBackend.close();
                        operatorStateBackend.dispose();
 
-                       //TODO this is temporarily casted to test already 
functionality that we do not yet expose through public API
-                       operatorStateBackend = (DefaultOperatorStateBackend) 
abstractStateBackend.createOperatorStateBackend(
+                       operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(
                                        createMockEnvironment(),
                                        "testOperator");
 
@@ -253,7 +252,7 @@ public class OperatorStateBackendTest {
 
                        listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
                        listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
-                       listState3 = 
operatorStateBackend.getBroadcastOperatorState(stateDescriptor3);
+                       listState3 = 
operatorStateBackend.getUnionListState(stateDescriptor3);
 
                        assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index fa05e1d..88dd1dd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -145,7 +145,7 @@ public class RescalingITCase extends TestLogger {
        }
 
        /**
-        * Tests that a a job with purely keyed state can be restarted from a 
savepoint
+        * Tests that a job with purely keyed state can be restarted from a 
savepoint
         * with a different parallelism.
         */
        public void testSavepointRescalingKeyedState(boolean scaleOut, boolean 
deriveMaxParallelism) throws Exception {
@@ -993,10 +993,9 @@ public class RescalingITCase extends TestLogger {
                public void initializeState(FunctionInitializationContext 
context) throws Exception {
 
                        if (broadcast) {
-                               //TODO this is temporarily casted to test 
already functionality that we do not yet expose through public API
-                               DefaultOperatorStateBackend operatorStateStore 
= (DefaultOperatorStateBackend) context.getOperatorStateStore();
-                               this.counterPartitions =
-                                               
operatorStateStore.getBroadcastSerializableListState("counter_partitions");
+                               this.counterPartitions = context
+                                               .getOperatorStateStore()
+                                               .getUnionListState(new 
ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE));
                        } else {
                                this.counterPartitions = context
                                                .getOperatorStateStore()

Reply via email to