[
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15307536#comment-15307536
]
ASF GitHub Bot commented on FLINK-3779:
---------------------------------------
Github user aljoscha commented on the pull request:
https://github.com/apache/flink/pull/2051#issuecomment-222648249
Hi,
just some high-level remarks about the API of `KeyedStream` and
`QueryableStateStream`. You could parameterize them both by the State type.
This way you would also get a very nice way of generically accessing the
different types of state on the query side. Let me quickly show what I mean.
`KeyedStream` would have this method:
```
<S extends State> QueryableStateStream<KEY, S> asQueryableState(
String queryableStateName,
StateDescriptor<S, ?> stateDescriptor);
```
the signature of `QueryableStateStream` would be like this:
```
public class QueryableStateStream<K, S extends State> {
private StateDescriptor<S, ?> stateDescriptor;
/**
* Read the state represented by this stream using a state client
*/
public S read(K key, QueryableStateClient client) {
return stateDescriptor.bind(new QueryStateBinder(client.read(key,
stateName)));
}
}
```
The nice thing about `StateDescriptor.bind()` is that you can use this to
create a state reader based on the state type. You pass in a custom
`StateBackend` (this should really be called `StateBinder`, btw). That
constructs a state reader that deserializes the bytes read from the state
client.
> Add support for queryable state
> -------------------------------
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee
> fault-tolerant processing of streams. Users can work with both
> non-partitioned (Checkpointed interface) and partitioned state
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state
> that are all scoped to the key of the current input element. This type of
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions
> with external systems such as key-value stores which are often the bottleneck
> in practice. Exposing the local state to the outside moves a good part of the
> database work into the stream processor, allowing both high throughput
> queries and immediate access to the computed state.
> This is the initial design doc for the feature:
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
> Feel free to comment.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)