[ https://issues.apache.org/jira/browse/FLINK-5801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876416#comment-15876416 ]
Ufuk Celebi commented on FLINK-5801: ------------------------------------ The root cause of this a mismatch in the key serializers. The client Scala code creates a generic KryoSerializer, because the scala.Long type cannot be captured at runtime. A work around is to use a macro when creating the serializers: {code} import org.apache.flink.streaming.api.scala._ ... val keySerializer = createTypeInformation[Long].createSerializer(new ExecutionConfig) {code} I created FLINK-5876 for adding a note to the docs. Sorry for the inconvenience this has caused. > Queryable State from Scala job/client fails with key of type Long > ----------------------------------------------------------------- > > Key: FLINK-5801 > URL: https://issues.apache.org/jira/browse/FLINK-5801 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.2.0 > Environment: Flink 1.2.0 > Scala 2.10 > Reporter: Patrick Lucas > Assignee: Ufuk Celebi > Attachments: OrderFulfillment.scala, OrderFulfillmentStateQuery.scala > > > While working on a demo Flink job, to try out Queryable State, I exposed some > state of type Long -> custom class via the Query server. However, the query > server returned an exception when I tried to send a query: > {noformat} > Exception in thread "main" java.lang.RuntimeException: Failed to query state > backend for query 0. Caused by: java.io.IOException: Unable to deserialize > key and namespace. This indicates a mismatch in the key/namespace serializers > used by the KvState instance and this access. > at > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeKeyAndNamespace(KvStateRequestSerializer.java:392) > at > org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:130) > at > org.apache.flink.runtime.query.netty.KvStateServerHandler$AsyncKvStateQueryTask.run(KvStateServerHandler.java:220) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.EOFException > at > org.apache.flink.runtime.util.DataInputDeserializer.readLong(DataInputDeserializer.java:217) > at > org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:69) > at > org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:27) > at > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeKeyAndNamespace(KvStateRequestSerializer.java:379) > ... 7 more > at > org.apache.flink.runtime.query.netty.KvStateServerHandler$AsyncKvStateQueryTask.run(KvStateServerHandler.java:257) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > I banged my head against this for a while, then per [~jgrier]'s suggestion I > tried simply changing the key from Long to String (modifying the two > {{keyBy}} calls and the {{keySerializer}} {{TypeHint}} in the attached code) > and it started working perfectly. > cc [~uce] -- This message was sent by Atlassian JIRA (v6.3.15#6346)