[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405647#comment-17405647
 ] 

Yun Tang commented on FLINK-23886:
----------------------------------

Thanks for Piotr's explaniation, [~qingru zhang] do you have legacy souce or 
used the checkpoint lock in the same task?

Moreover, could you help to do things below to make things more easy to figure 
out?
1. reproduce it on a supported Flink version (Flink-1.13 is better)
2. share the code or provide a minimalistic example that reproduces this problem

> An exception is thrown out when recover job timers from checkpoint file
> -----------------------------------------------------------------------
>
>                 Key: FLINK-23886
>                 URL: https://issues.apache.org/jira/browse/FLINK-23886
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>            Reporter: JING ZHANG
>            Priority: Major
>         Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
> at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
> ... 20 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to