[ 
https://issues.apache.org/jira/browse/FLINK-17918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120968#comment-17120968
 ] 

Arvid Heise commented on FLINK-17918:
-------------------------------------

Yes [~ykt836] . I'm suspecting that during [updating the state of 
Limit|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java#L116],
 we add a mutable list as the value, such that after the state is being 
snapshotted, the value is still mutable.

Thus, when the value is actually being serialized in the background, there may 
have been updates to the list by task thread.

I'm assuming a conservative fix is to add a copy of the list as the value. 
Alternatively, I guess we could also use a keyed list state, but I'm not very 
knowledgable about states.

> Blink Jobs are loosing data on recovery
> ---------------------------------------
>
>                 Key: FLINK-17918
>                 URL: https://issues.apache.org/jira/browse/FLINK-17918
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Table SQL / Runtime
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Assignee: Arvid Heise
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> After trying to enable unaligned checkpoints by default, a lot of Blink 
> streaming SQL/Table API tests containing joins or set operations are throwing 
> errors that are indicating we are loosing some data (full records, without 
> deserialisation errors). Example errors:
> {noformat}
> [ERROR] Failures: 
> [ERROR]   JoinITCase.testFullJoinWithEqualPk:775 expected:<List(1,1, 2,2, 
> 3,3, null,4, null,5)> but was:<List(2,2, 3,3, null,1, null,4, null,5)>
> [ERROR]   JoinITCase.testStreamJoinWithSameRecord:391 expected:<List(1,1,1,1, 
> 1,1,1,1, 2,2,2,2, 2,2,2,2, 3,3,3,3, 3,3,3,3, 4,4,4,4, 4,4,4,4, 5,5,5,5, 
> 5,5,5,5)> but was:<List()>
> [ERROR]   SemiAntiJoinStreamITCase.testAntiJoin:352 expected:<0> but was:<1>
> [ERROR]   SetOperatorsITCase.testIntersect:55 expected:<MutableList(1,1,Hi, 
> 2,2,Hello, 3,2,Hello world)> but was:<List()>
> [ERROR]   JoinITCase.testJoinPushThroughJoin:1272 expected:<List(1,0,Hi, 
> 2,1,Hello, 2,1,Hello world)> but was:<List(2,1,Hello, 2,1,Hello world)>
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to