[FLINK-5991] [DataStream] Expose Union ListState for operator state This commit exposes the union list state scheme for managed operator state. The actual functionality was already previously added to the `DefaultOperatorStateBackend`, so this change simply exposes the feature through the `OperatorStateStore` interface.
This commit also updates the documentation for managed operator state so that it covers the new union list state scheme. It strengthens the difference between keyed and non-keyed state data structures by emphasizing the semantic differences in the state access method Javadocs. This closes #3508. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ef4900a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ef4900a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ef4900a Branch: refs/heads/master Commit: 2ef4900aa279e75844a9f8536cfe007c2542187d Parents: a1aab64 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Wed Apr 19 02:53:31 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Wed Apr 19 10:33:29 2017 +0800 ---------------------------------------------------------------------- docs/dev/stream/state.md | 109 ++++++++++++------- .../api/common/state/OperatorStateStore.java | 36 +++++- .../state/DefaultOperatorStateBackend.java | 8 +- .../runtime/state/OperatorStateBackendTest.java | 21 ++-- .../test/checkpointing/RescalingITCase.java | 9 +- 5 files changed, 122 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/docs/dev/stream/state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md index 40522e1..276842d 100644 --- a/docs/dev/stream/state.md +++ b/docs/dev/stream/state.md @@ -64,16 +64,12 @@ for one or more Key Groups. With *Operator State* (or *non-keyed state*), each operator state is bound to one parallel operator instance. -The Kafka source connector is a good motivating example for the use of Operator State -in Flink. Each parallel instance of this Kafka consumer maintains a map +The [Kafka Connector](../connectors/kafka.html) is a good motivating example for the use of Operator State +in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State. The Operator State interfaces support redistributing state among -parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution; the following are currently defined: - - - **List-style redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of - all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. - Each operator gets a sublist, which can be empty, or contain one or more elements. +parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution. ## Raw and Managed State @@ -233,32 +229,13 @@ val counts: DataStream[(String, Int)] = stream ## Using Managed Operator State -A stateful function can implement either the more general `CheckpointedFunction` +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed<T extends Serializable>` interface. -In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - -##### ListCheckpointed +#### CheckpointedFunction -The `ListCheckpointed` interface requires the implementation of two methods: - -{% highlight java %} -List<T> snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List<T> state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -##### CheckpointedFunction - -The `CheckpointedFunction` interface also requires the implementation of two methods: +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: {% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception; @@ -266,12 +243,30 @@ void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %} -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that -uses state to buffer elements before sending them to the outside world: +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of + all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. + Each operator gets a sublist, which can be empty, or contain one or more elements. + As an example, if with parallelism 1 the checkpointed state of an operator + contains elements `element1` and `element2`, when increasing the parallelism to 2, `element1` may end up in operator instance 0, + while `element2` will go to operator instance 1. + + - **Union redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of + all lists. On restore/redistribution, each operator gets the complete list of state elements. + +Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction` +to buffer elements before sending them to the outside world. It demonstrates +the basic even-split redistribution list state: {% highlight java %} public class BufferingSink @@ -311,8 +306,13 @@ public class BufferingSink @Override public void initializeState(FunctionInitializationContext context) throws Exception { - checkpointedState = context.getOperatorStateStore(). - getSerializableListState("buffered-elements"); + ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "buffered-elements", + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), + Tuple2.of(0L, 0L)); + + checkpointedState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { @@ -329,12 +329,29 @@ public class BufferingSink } {% endhighlight %} - The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects are going to be stored upon checkpointing. -`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");` +Note how the state is initialized, similar to keyed state, +with a `StateDescriptor` that contains the state name and information +about the type of the value that the state holds: + +{% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "buffered-elements", + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), + Tuple2.of(0L, 0L)); + +checkpointedState = context.getOperatorStateStore().getListState(descriptor); +{% endhighlight %} + +The naming convention of the state access methods contain its redistribution +pattern followed by its state structure. For example, to use list state with the +union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`. +If the method name does not contain the redistribution pattern, *e.g.* `getListState(descriptor)`, +it simply implies that the basic even-split redistribution scheme will be used. After initializing the container, we use the `isRestored()` method of the context to check if we are recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied. @@ -346,6 +363,22 @@ of all objects included by the previous checkpoint, and is then filled with the As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done using the provided `FunctionInitializationContext`. +#### ListCheckpointed + +The `ListCheckpointed` interface is a more limited variant of `CheckpointedFunction`, +which only supports list-style state with even-split redistribution scheme on restore. +It also requires the implementation of two methods: + +{% highlight java %} +List<T> snapshotState(long checkpointId, long timestamp) throws Exception; + +void restoreState(List<T> state) throws Exception; +{% endhighlight %} + +On `snapshotState()` the operator should return a list of objects to checkpoint and +`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always +return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. + ### Stateful Source Functions Stateful sources require a bit more care as opposed to other operators. http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java index 8be04fc..bf22041 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java @@ -33,7 +33,16 @@ public interface OperatorStateStore { * Creates (or restores) a list state. Each state is registered under a unique name. * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). * - * The items in the list are repartitionable by the system in case of changed operator parallelism. + * <p>Note the semantic differences between an operator list state and a keyed list state + * (see {@link KeyedStateStore#getListState(ListStateDescriptor)}). Under the context of operator state, + * the list is a collection of state items that are independent from each other and eligible for redistribution + * across operator instances in case of changed operator parallelism. In other words, these state items are + * the finest granularity at which non-keyed state can be redistributed, and should not be correlated with + * each other. + * + * <p>The redistribution scheme of this list state upon operator rescaling is a round-robin pattern, such that + * the logical whole state (a concatenation of all the lists of state elements previously managed by each operator + * before the restore) is evenly divided into as many sublists as there are parallel operators. * * @param stateDescriptor The descriptor for this state, providing a name and serializer. * @param <S> The generic type of the state @@ -44,7 +53,32 @@ public interface OperatorStateStore { <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception; /** + * Creates (or restores) a list state. Each state is registered under a unique name. + * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). + * + * <p>Note the semantic differences between an operator list state and a keyed list state + * (see {@link KeyedStateStore#getListState(ListStateDescriptor)}). Under the context of operator state, + * the list is a collection of state items that are independent from each other and eligible for redistribution + * across operator instances in case of changed operator parallelism. In other words, these state items are + * the finest granularity at which non-keyed state can be redistributed, and should not be correlated with + * each other. + * + * <p>The redistribution scheme of this list state upon operator rescaling is a broadcast pattern, such that + * the logical whole state (a concatenation of all the lists of state elements previously managed by each operator + * before the restore) is restored to all parallel operators so that each of them will get the union of all state + * items before the restore. + * + * @param stateDescriptor The descriptor for this state, providing a name and serializer. + * @param <S> The generic type of the state + * + * @return A list for all state partitions. + * @throws Exception + */ + <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception; + + /** * Returns a set with the names of all currently registered states. + * * @return set of names for all registered states. */ Set<String> getRegisteredStateNames(); http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 42d4519..eb3ba01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -95,12 +95,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); } - @SuppressWarnings("unchecked") - public <T extends Serializable> ListState<T> getBroadcastSerializableListState(String stateName) throws Exception { - return (ListState<T>) getBroadcastOperatorState(new ListStateDescriptor<>(stateName, javaSerializer)); - } - - public <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception { + @Override + public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception { return getListState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST); } http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 5a072df..13a6307 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -55,7 +55,7 @@ public class OperatorStateBackendTest { public void testCreateOnAbstractStateBackend() throws Exception { // we use the memory state backend as a subclass of the AbstractStateBackend final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(); - OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend( + final OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend( createMockEnvironment(), "test-operator"); assertNotNull(operatorStateBackend); @@ -75,7 +75,7 @@ public class OperatorStateBackendTest { final ExecutionConfig cfg = new ExecutionConfig(); cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class); - final DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg); + final OperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg); ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class); ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class); @@ -107,7 +107,7 @@ public class OperatorStateBackendTest { @Test public void testRegisterStates() throws Exception { - final DefaultOperatorStateBackend operatorStateBackend = + final OperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, new ExecutionConfig()); ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); @@ -140,7 +140,7 @@ public class OperatorStateBackendTest { assertEquals(23, it.next()); assertTrue(!it.hasNext()); - ListState<Serializable> listState3 = operatorStateBackend.getBroadcastOperatorState(stateDescriptor3); + ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3); assertNotNull(listState3); assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); assertTrue(!it.hasNext()); @@ -176,7 +176,7 @@ public class OperatorStateBackendTest { assertTrue(!it.hasNext()); try { - operatorStateBackend.getBroadcastOperatorState(stateDescriptor2); + operatorStateBackend.getUnionListState(stateDescriptor2); fail("Did not detect changed mode"); } catch (IllegalStateException ignored) { @@ -194,7 +194,7 @@ public class OperatorStateBackendTest { public void testSnapshotEmpty() throws Exception { final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); - final DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend) + final OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator"); CheckpointStreamFactory streamFactory = @@ -211,7 +211,7 @@ public class OperatorStateBackendTest { public void testSnapshotRestore() throws Exception { AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); - DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend) + OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name"); ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); @@ -219,7 +219,7 @@ public class OperatorStateBackendTest { ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1); ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2); - ListState<Serializable> listState3 = operatorStateBackend.getBroadcastOperatorState(stateDescriptor3); + ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3); listState1.add(42); listState1.add(4711); @@ -242,8 +242,7 @@ public class OperatorStateBackendTest { operatorStateBackend.close(); operatorStateBackend.dispose(); - //TODO this is temporarily casted to test already functionality that we do not yet expose through public API - operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend( + operatorStateBackend = abstractStateBackend.createOperatorStateBackend( createMockEnvironment(), "testOperator"); @@ -253,7 +252,7 @@ public class OperatorStateBackendTest { listState1 = operatorStateBackend.getListState(stateDescriptor1); listState2 = operatorStateBackend.getListState(stateDescriptor2); - listState3 = operatorStateBackend.getBroadcastOperatorState(stateDescriptor3); + listState3 = operatorStateBackend.getUnionListState(stateDescriptor3); assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); http://git-wip-us.apache.org/repos/asf/flink/blob/2ef4900a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index fa05e1d..88dd1dd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -145,7 +145,7 @@ public class RescalingITCase extends TestLogger { } /** - * Tests that a a job with purely keyed state can be restarted from a savepoint + * Tests that a job with purely keyed state can be restarted from a savepoint * with a different parallelism. */ public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism) throws Exception { @@ -993,10 +993,9 @@ public class RescalingITCase extends TestLogger { public void initializeState(FunctionInitializationContext context) throws Exception { if (broadcast) { - //TODO this is temporarily casted to test already functionality that we do not yet expose through public API - DefaultOperatorStateBackend operatorStateStore = (DefaultOperatorStateBackend) context.getOperatorStateStore(); - this.counterPartitions = - operatorStateStore.getBroadcastSerializableListState("counter_partitions"); + this.counterPartitions = context + .getOperatorStateStore() + .getUnionListState(new ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE)); } else { this.counterPartitions = context .getOperatorStateStore()
