[
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17229790#comment-17229790
]
Tzu-Li (Gordon) Tai edited comment on FLINK-19300 at 11/11/20, 7:06 AM:
------------------------------------------------------------------------
This looks like a real issue, the typical {{read}} v.s. {{readFully}} mistake.
This read path should only occur for the case where users are using RocksDB
backends + heap-based timers (using the heap backend should be fine, would not
bump into this).
[~xianggao] to help me understand the full problem: in your scenarios, I'm
assuming that the timer losses are caused by somehow the
{{InternalTimerServiceSerializationProxy}} silently skipping reading the
timers, instead of some {{IOException}} due to incorrect read attempts (and
eventually fails the restore, instead of a timer loss). Could you clarify if my
assumption is correct?
As for the fix, I would suggest to try to reuse the
{{java.io.DataInput#readFully}} method instead or re-implementing it:
{code}
byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
DataInputView inputView = new DataInputViewStreamWrapper(inputStream);
inputView.readFully(tmp);
{code}
You can catch {{EOFException}} to determine if end of stream is reached / not
enough bytes available.
was (Author: tzulitai):
This looks like a real issue, the typical {{read}} v.s. {{readFully}} mistake.
This read path should only occur for the case where users are using RocksDB
backends + heap-based timers (using the heap backend should be fine, would not
bump into this).
[~xianggao] to help me understand the full problem: in your scenarios, I'm
assuming that the timer losses are caused by somehow the
{{InternalTimerServiceSerializationProxy}} silently skipping reading the
timers, instead of some {{IOException}} due to incorrect read attempts (and
eventually fails the restore, instead of a timer loss). Could you clarify if my
assumption is correct?
As for the fix, I would suggest to try to reuse the
{{java.io.DataInput#readFully}} method instead or re-implementing it:
{code}
byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
DataInputView inputView = new DataInputViewStreamWrapper(inputStream);
inputView.readFully(tmp);
{code}
You can catch {{EOFException}} to determine if end of stream is reached.
> Timer loss after restoring from savepoint
> -----------------------------------------
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Reporter: Xiang Gao
> Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after
> restoring program from savepoint, especially when using a remote savepoint
> storage (s3).
> After some investigation, the issue seems to be related to [this line in
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
> When trying to check the VERSIONED_IDENTIFIER, the input stream may not
> guarantee filling the byte array, causing timers to be dropped for the
> affected key group.
> Should keep reading until expected number of bytes are actually read or if
> end of the stream has been reached.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)