[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17229790#comment-17229790
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-19300 at 11/11/20, 7:05 AM:
------------------------------------------------------------------------

This looks like a real issue, the typical {{read}} v.s. {{readFully}} mistake.

This read path should only occur for the case where users are using RocksDB 
backends + heap-based timers (using the heap backend should be fine, would not 
bump into this).

[~xianggao] to help me understand the full problem: in your scenarios, I'm 
assuming that the timer losses are caused by somehow the 
{{InternalTimerServiceSerializationProxy}} silently skipping reading the 
timers, instead of some {{IOException}} due to incorrect read attempts (and 
eventually fails the restore, instead of a timer loss). Could you clarify if my 
assumption is correct?

As for the fix, I would suggest to try to reuse the 
{{java.io.DataInput#readFully}} method instead or re-implementing it:
{code}
byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];

DataInputView inputView = new DataInputViewStreamWrapper(inputStream);
inputView.readFully(tmp);
{code}

You can catch {{EOFException}} to determine if end of stream is reached.




was (Author: tzulitai):
This looks like a real issue, the typical {{read}} v.s. {{readFully}} mistake.

This read path should only occur for the case where users are using RocksDB 
backends + heap-based timers (using the heap backend should be fine, would not 
bump into this).

[~xianggao] to help me understand the full problem: in your scenarios, I'm 
assuming that the timer losses are caused by somehow the 
{{InternalTimerServiceSerializationProxy}} silently skipping reading the 
timers, instead of some {{IOException}} due to incorrect read attempts (and 
eventually fails the restore, instead of a timer loss). Could you clarify if my 
assumption is correct?

As for the fix, I would suggest to try to reuse the 
`java.io.DataInput#readFully` method instead or re-implementing it:
{code}
byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];

DataInputView inputView = new DataInputViewStreamWrapper(inputStream);
inputView.readFully(tmp);
{code}

You can catch {{EOFException}} to determine if end of stream is reached.



> Timer loss after restoring from savepoint
> -----------------------------------------
>
>                 Key: FLINK-19300
>                 URL: https://issues.apache.org/jira/browse/FLINK-19300
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>            Reporter: Xiang Gao
>            Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to