[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387930#comment-15387930
 ] 

ASF GitHub Bot commented on FLINK-3779:
---------------------------------------

Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Hi,
    
    Continuing the discussion from the mailing list, I was able to go past the 
NettyConfig problem once I ran Flink in cluster mode ( I would still like to 
know if there is a way to run in local mode so that I can avoid running SBT 
assembly every time ).
    
    But now I am stuck at error message "KvState does not hold any state for 
key/namespace." which I believe is because of my KeySerializer. Since I am 
running the QueryClient as a separate application, I don't have access to my 
queryableState to call `queryableState.getKeySerializer`
    
    My key is a tuple of (Long,String) and this is the naive serializer that I 
wrote (which is probably wrong and I have never written a serializer before)
    
    ```
    class KeySerializer extends TypeSerializerSingleton[(Long,String)]{
    
        private val EMPTY: (Long,String) = (0,"")
    
        override def createInstance(): (Long, String) = EMPTY
    
        override def getLength: Int = return 2;
    
        override def canEqual(o: scala.Any): Boolean = return 
o.isInstanceOf[(Long,String)]
    
        override def copy(t: (Long, String)): (Long, String) = t
    
        override def copy(t: (Long, String), t1: (Long, String)): (Long, 
String) = t
    
        override def copy(dataInputView: DataInputView, dataOutputView: 
DataOutputView): Unit =  {
          dataOutputView.writeLong(dataInputView.readLong())
          StringValue.copyString(dataInputView,dataOutputView)
        }
    
        override def serialize(t: (Long, String), dataOutputView: 
DataOutputView): Unit = {
          dataOutputView.writeLong(t._1)
          StringValue.writeString(t._2,dataOutputView)
        }
    
        override def isImmutableType: Boolean = true
    
        override def deserialize(dataInputView: DataInputView): (Long, String) 
= {
          val l = dataInputView.readLong()
          val s = StringValue.readString(dataInputView)
          (l,s)
        }
    
        override def deserialize(t: (Long, String), dataInputView: 
DataInputView): (Long, String) = deserialize(dataInputView)
      }
    ```
    
    Can you tell me what I am doing wrong here? Thanks!


> Add support for queryable state
> -------------------------------
>
>                 Key: FLINK-3779
>                 URL: https://issues.apache.org/jira/browse/FLINK-3779
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to