[
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15306793#comment-15306793
]
ASF GitHub Bot commented on FLINK-3779:
---------------------------------------
GitHub user uce opened a pull request:
https://github.com/apache/flink/pull/2051
[FLINK-3779] Add support for queryable state
First of all, thanks to @tillrohrmann, @aljoscha, and @StephanEwen for
discussions during and before implementing this first version. The initial
design document can be found here:
https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g
**In a nutshell, this feature allows users to query Flink's managed
partitioned state from outside of Flink. This eliminates the need for
distributed operations/transactions with external systems such as key-value
stores which are often the bottleneck in practice.**
# APIs
## QueryableStateStream
The following methods have been added as `@PublicEvolving` to `KeyedStream`:
```java
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// ListState
QueryableStateStream asQueryableState(
String queryableStateName,
ListStateDescriptor stateDescriptor)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
```
A call to these methods returns a `QueryableStateStream`, which cannot be
further transformed and currently only holds the value and key serializer for
the queryable state stream. It's comparable to a sink, after which you cannot
do further transformations.
The `QueryableStateStream` gets translated to an operator, which uses all
incoming records to update the queryable state instance. If you have a program
like `stream.keyBy(0).asQueryableState("query-name")`, all records of the keyed
stream will be used to update the state instance, either via `update` for
`ValueState` or `add` for `AppendingState`. This acts like the Scala API's
`flatMapWithState`. For an example, take a look at `QueryableStateITCase` in
`flink-tests`.
I understand that these are quite a few methods to add to the public APIs,
but I am not aware of another way to do it if we want to ensure type safety
when providing the state descriptor. @aljoscha, you have quite some experience
with designing APIs. Is there maybe a better way? And what do you (and others)
think about the name `QueryableStateStream`? We could also go for the shorter
`QueryableState` or something else even. I'm open to suggestions.
## QueryableStateClient
This is the client used for queries against the KvState instances. The
query method is this:
```java
Future<byte[]> getKvState(
JobID jobID,
String queryableStateName,
int keyHashCode,
byte[] serializedKeyAndNamespace)
```
A call to the method returns a Future eventually holding the serialized
state value for the queryable state instance identified by `queryableStateName`
of the job with ID `jobID`. The `keyHashCode` is the hash code as returned by
`Object.hashCode()` and the `serializedKeyAndNamespace` is the serialized key
and namespace. The client is asynchronous and can be shared by multiple
Threads. An example can be seen in `QueryableStateITCase` (in `flink-tests`).
The current implementation is low-level in the sense that it only works
with serialized data both for providing the key/namespace and the returned
results. It's the responsibility of the user (or some follow-up utilities) to
set up the serializers for this. The nice thing about this is that the query
services don't have to get into the business of worrying about any class
loading issues etc.
There are some serialization utils for key/namespace and value
serialization included in `KvStateRequestSerializer`.
# Implementation
The following sections highlight the main changes/additions.
## Added `setQueryable(String)` to `StateDescriptor`
KvState instances are published for queries when they have a queryable
state name set (see below). For this purpose, I've introduced the
`setQueryable(String)` method to the `StateDescriptor` interface. The provided
name is different from the state descriptor name we already had before. For
queries, only the name provided in `setQueryable(String)` is relevant.
The name needs to be unique per job. If this is not the case, the job fails
at runtime with an unrecoverable exception. Unfortunately, this can not be
checked before submitting the job.
## Added `byte[] getSerializedValue(byte[] serializedKeyNamespace)` to
`KvState`
This method is implemented by all KvState instances for queries. Since all
state instances have references to their serializers, they have to worry about
serialization and the caller does not.
For Java heap-backed state, we deserialize the key and namespace, access
the state for the key/namespace, and serialize the result. For RocksDB backed
state, we can directly use the `serializedKeyAndNamespace` to access the
serialized result.
Furthermore, with the RocksDB state backend we don't have to worry about
concurrent accesses to the state instance whereas we need `ConcurrentHashMap`s
for the internal key/namespace maps of `AbstractHeapState` if the state
instance is queryable.
## Added `KvStateRegistry` to TaskManager
This is a very simple registry on the TaskManager. The
`AbstractStateBackend` registers `KvState` instances at runtime on:
- first call to `getPartitionedState()`, which creates the `State`
instance, or
- `injectKeyValueStateSnapshots()`.
At the moment, we essentially have two variants for the state backends:
either a RocksDB backed or a Java heap-backed backend
(`FileSystemStateBackend`, `MemoryStateBackend`).
A note on restoring: RocksDB state will only be published for queries on
`getPartitionedState()` whereas Java heap-backed state is already published on
`injectKeyValueStateSnapshots()`. This has to do with the way that the RocksDB
state backend organizes the state internally. I didn't want to change a lot
there and I think it's a fair compromise for the first version.
## Added `KvStateLocationRegistry` to JobManager
The `KvStateRegistry` of each TaskManager reports the registered state
instances to the JobManager, where they are aggregated by the
`KvStateLocationRegistry`. The purpose of this is to allow clients to query the
JobManager for location information about the state they want to query. There
is one `KvStateLocation` for each registered queryable state, which maps each
key group index (currently the sub task index) to the server address holding
the state instance.
With this, the client can figure out which TaskManager to query for each
key. Only when the location is unknown or out-of-sync, there needs to be
communication between the client and JobManager.
The lookup of `KvStateLocation` instances happens via Akka.
## Added `KvStateClient` and `KvStateServer` for network transfers
The `KvStateClient` and `KvStateServer` are responsible for the actual data
exchange via TCP. Each TaskManager runs a single `KvStateServer`, which queries
the local `KvStateRegistry` on incoming requests.
Connections are established and released by the client. Only on failures,
does the server close a connection. Each client connection can be shared by
multiple Threads.
Both client and server keep track of statistics for their respects (how
many requests and how long did they take).
# Limitations
- User docs are sparse. I wanted to wait for some initial feedback with
this PR before writing anything.
- The queryable state life-cycle is bound to the life-cycle of the job,
e.g. tasks register queryable state on startup and unregister it on dispose. In
future versions, it is desirable to decouple this in order to allow queries
after a task finishes and to speed up recovery via state replication.
- Notifications about available `KvState` happen via a simple `tell`. This
should be improved to be more robust with `ask`s and acknowledgements. This was
held simple on purpose in anticipation of possible state replication
improvements (see first point), which probably need a differnt model of
reporting available state.
- The server and client keep track of statistics for queries. These are
currently disabled by default as they would not be exposed anywhere. As soon as
there is better support to publish these numbers via the Metrics system, we
should enable the stats.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/flink 3779-queryable_state
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2051.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2051
----
commit c1e8466f09787d229b78019e4d33d1c64932c74e
Author: Ufuk Celebi <[email protected]>
Date: 2016-05-30T11:42:39Z
[FLINK-3779] [runtime] Add getSerializedValue(byte[]) to KvState
[statebackend-rocksdb, core, streaming-java]
- Adds the getSerializedValue(byte[]) to KvState, which is used to query
single
KvState instances. The serialization business is left to the KvState in
order
to not burden the accessor -- e.g. the querying network thread -- with
setting
up/accessing the serializers.
- Adds quaryable flag to the StateDescriptor. State, which sets a queryable
state
name will be published for queries to the KvStateRegistry.
- Prohibts null namespace and enforces VoidNamespace instead. This makes
stuff
more explicit. Furthermore, the concurrent map used for queryable memroy
state
does not allow working with null keys.
commit cd651a651c247e3638d81c3db32b9619a35aaf2a
Author: Ufuk Celebi <[email protected]>
Date: 2016-05-30T12:03:35Z
[FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
[streaming-java]
- Adds a KvStateRegistry per TaskManager at which created KvState instances
are
registered/unregistered.
- Registered KvState instances are reported to the JobManager, whcih can be
queried for KvStateLocation.
commit 3405e4a5fc3429f211b5ed0e4d6da394f7d5bb4e
Author: Ufuk Celebi <[email protected]>
Date: 2016-05-30T12:00:49Z
[FLINK-3779] [runtime] Add KvState network client and server
- Adds a Netty-based server and client to query KvState instances, which
have
been published to the KvStateRegistry.
commit bb4baa33aaca8475d46da4c6c0ec3cbecbefa81a
Author: Ufuk Celebi <[email protected]>
Date: 2016-05-30T12:08:03Z
[FLINK-3779] [runtime] Add KvStateLocation lookup service
- Adds an Akka-based KvStateLocation lookup service to be used by the client
to look up location information.
commit 7256e1dae34d6dbb8d96560bd0eac7a51ced7515
Author: Ufuk Celebi <[email protected]>
Date: 2016-05-30T12:08:24Z
[FLINK-3779] [runtime] Add QueryableStateClient
- Adds a client, which works with the network client and location lookup
service
to query KvState instances.
- Furthermore, location information is cached.
commit d7e602a55a4cfa6825b2126d51ed711d3c6ea866
Author: Ufuk Celebi <[email protected]>
Date: 2016-05-30T12:08:34Z
[FLINK-3779] [streaming-java, streaming-scala] Add QueryableStateStream to
KeyedStream
[runtime, test-utils, tests]
- Exposes queryable state on the API via
KeyedStream#asQueryableState(String, StateDescriptor).
This creates and operator, which consumes the keyed stream and exposes
the stream
as queryable state.
----
> Add support for queryable state
> -------------------------------
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee
> fault-tolerant processing of streams. Users can work with both
> non-partitioned (Checkpointed interface) and partitioned state
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state
> that are all scoped to the key of the current input element. This type of
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions
> with external systems such as key-value stores which are often the bottleneck
> in practice. Exposing the local state to the outside moves a good part of the
> database work into the stream processor, allowing both high throughput
> queries and immediate access to the computed state.
> This is the initial design doc for the feature:
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
> Feel free to comment.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)