[
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404867#comment-17404867
]
JING ZHANG commented on FLINK-23886:
------------------------------------
[~ahasib]
My previous response in mail list was too brief, sorry about that. For the use
of config 'state.backend.rocksdb.timer-service.factory: HEAP', I would like to
add some information. Please correct me if anything is wrong.
# The configure could help to avoid the EOF exception when recover rocksdb
queue set from savepoint/checkpoint file. As you could see, if we use HEAP for
timers, the timer state and other operator state would not mixed up in one
rocksdb instance. So we could avoid the exception. The jobs which adds the
configure don't encounter such problem again too.
# If update the timer store in Rocksdb to store in HEAP, we could not restore
the job from savepoint/checkpoint indeed. If the user makes a mistake by
recovering job from savepoint/checkpoint file, the job will not throw an
exception, but the timer state will be lost. This is unreasonable which we
could add a Jira to fix the issue. We suggest our users to use backfill mode to
restart the job instead of restart from cp/sp files.
# When we use 'state.backend.rocksdb.timer-service.factory: HEAP', the job may
consume more memory. Therefore, we need to pay close attention to memory usage
and increase memory resources if necessary.
> An exception is thrown out when recover job timers from checkpoint file
> -----------------------------------------------------------------------
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Reporter: JING ZHANG
> Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png,
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png,
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist.
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
> paste the content here.
> Setup Specifics:
> Version: 1.6.2
> RocksDB Map State
> Timers stored in rocksdb
>
> When we have this job running for long periods of time like > 30 days, if
> for some reason the job restarts, we encounter "Error while deserializing the
> element". Is this a known issue fixed in later versions? I see some changes
> to code for FLINK-10175, but we don't use any queryable state
>
> Below is the stack trace
>
> org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
> element.
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
> ... 20 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)