[
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404501#comment-17404501
]
JING ZHANG edited comment on FLINK-23886 at 8/25/21, 2:18 PM:
--------------------------------------------------------------
[~arvid] [~yunta] Thanks a lot for information.
However, there is one weird point that could not explain. We expect the value
of this state always be empty because TimerSerializer only writes the key to
state, it never writes the value to state. But with Yun Tang's help, we recover
the keyed state which stores rocksdb queue set, we find value of some timer
state is not empty, which is unexpected. Besides we could deserialize the error
timer state by the key type serializer and value type serializer of window
state which is another state in the same Operator.
Could this caused by concurrent problem?
was (Author: qingru zhang):
[~arvid] [~yunta] Thanks a lot for information.
However, there is one weird point that could not explain. We expect the value
of this state always be empty because TimerSerializer only writes the key to
state, it never writes the value to state. But with Yun Tang's help, we recover
the keyed state which stores rocksdb queue set, we find value of some timer
state is not empty, which is unexpected. Besides we could deserialize the error
timer state by the key type serializer and value type serializer of window
state which is another state in the same Operator.
Could this caused by concurrent problem?
!image-2021-08-25-22-10-32-309.png|width=556,height=440!
> An exception is thrown out when recover job timers from checkpoint file
> -----------------------------------------------------------------------
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Reporter: JING ZHANG
> Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png,
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png,
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist.
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
> paste the content here.
> Setup Specifics:
> Version: 1.6.2
> RocksDB Map State
> Timers stored in rocksdb
>
> When we have this job running for long periods of time like > 30 days, if
> for some reason the job restarts, we encounter "Error while deserializing the
> element". Is this a known issue fixed in later versions? I see some changes
> to code for FLINK-10175, but we don't use any queryable state
>
> Below is the stack trace
>
> org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
> element.
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
> ... 20 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)