[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15388116#comment-15388116
 ] 

ASF GitHub Bot commented on FLINK-3779:
---------------------------------------

Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Thanks Ufuk & Stephen for the reply,
    
    I tried the serializers suggested by you
    ```
    val typeHint = new TypeHint[Tuple2[Long,String]](){}
    val serializer = TypeInformation.of(typeHint).createSerializer(null)
    
    //also tried this
    val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, 
LongSerializer.INSTANCE)
    val serializer2 = new 
TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String,
 Long]]], fieldSerializers)
    ```
    
    But both gives me compilation error at
    ```
    val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
          key,
          serializer2,
          VoidNamespace.INSTANCE,
          VoidNamespaceSerializer.INSTANCE)
    ```
    the compilation error is:
    ```
    Error:(43, 7) type mismatch;
    found   : 
org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]]
     required: 
org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable]
    Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: 
java.io.Serializable, but Java-defined class TypeSerializer is invariant in 
type T.
    You may wish to investigate a wildcard type such as `_ <: 
java.io.Serializable`. (SLS 3.2.10)
          serializer,
          ^
    ```
    
    I had seen this before when I tried to set the serializer from 
`queryableState.getKeySerializer` 
    
    Note : It works fine when I use the longer version of serializer that I 
created.


> Add support for queryable state
> -------------------------------
>
>                 Key: FLINK-3779
>                 URL: https://issues.apache.org/jira/browse/FLINK-3779
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to