[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15307536#comment-15307536
 ] 

ASF GitHub Bot commented on FLINK-3779:
---------------------------------------

Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/2051#issuecomment-222648249
  
    Hi,
    just some high-level remarks about the API of `KeyedStream` and 
`QueryableStateStream`. You could parameterize them both by the State type. 
This way you would also get a very nice way of generically accessing the 
different types of state on the query side. Let me quickly show what I mean. 
`KeyedStream` would have this method:
    
    ```
    <S extends State> QueryableStateStream<KEY, S> asQueryableState(
        String queryableStateName,
        StateDescriptor<S, ?> stateDescriptor);
    ```
    
    the signature of `QueryableStateStream` would be like this:
    
    ```
    public class QueryableStateStream<K, S extends State> {
        private StateDescriptor<S, ?> stateDescriptor;
        
        /**
         * Read the state represented by this stream using a state client
         */ 
        public S read(K key, QueryableStateClient client) {
            return stateDescriptor.bind(new QueryStateBinder(client.read(key, 
stateName)));
        }
    }
    ```
    
    The nice thing about `StateDescriptor.bind()` is that you can use this to 
create a state reader based on the state type. You pass in a custom 
`StateBackend` (this should really be called `StateBinder`, btw). That 
constructs a state reader that deserializes the bytes read from the state 
client.



> Add support for queryable state
> -------------------------------
>
>                 Key: FLINK-3779
>                 URL: https://issues.apache.org/jira/browse/FLINK-3779
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to