[ https://issues.apache.org/jira/browse/FLINK-17918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121031#comment-17121031 ]
Piotr Nowojski commented on FLINK-17918: ---------------------------------------- I think [~AHeise] is right. We are mutating the list that's used in the state field {{AppendOnlyTopNFunction#dataState}} [here|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java#L113] Someone could correct me if I'm wrong, I do not know the guarantees of our state backends very well, but via looking at the relevant code there is an assumption, that sync part of checkpoint should do all of the defensive copies/protections from making sure of "freezeing" state snapshot for later IO operations. This seems to be confirmed by the java doc above {{org.apache.flink.runtime.state.heap.CopyOnWriteStateMap#stateSnapshot}} (it's being invoked in sync checkpoint part for the {{AppendOnlyTopNFunction#dataState}} field): {code:java} /** * Creates a snapshot of this {@link CopyOnWriteStateMap}, to be written in checkpointing. The snapshot integrity * is protected through copy-on-write from the {@link CopyOnWriteStateMap}. Users should call * {@link #releaseSnapshot(StateMapSnapshot)} after using the returned object. * * @return a snapshot from this {@link CopyOnWriteStateMap}, for checkpointing. */ {code} but in the {{org.apache.flink.runtime.state.heap.CopyOnWriteStateMap#snapshotMapArrays}} we are only marking whole {{StateMapEntry}} (the type of {{AppendOnlyTopNFunction#dataState}} is {{MapState<RowData, List<RowData>>}}. Nothing more. So the defensive copy-on-write works only as long, as someone else is not updating the referenced structures (like the {{List<RowData>}} in the background. > Blink Jobs are loosing data on recovery > --------------------------------------- > > Key: FLINK-17918 > URL: https://issues.apache.org/jira/browse/FLINK-17918 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Table SQL / Runtime > Affects Versions: 1.11.0 > Reporter: Piotr Nowojski > Assignee: Arvid Heise > Priority: Blocker > Fix For: 1.11.0 > > > After trying to enable unaligned checkpoints by default, a lot of Blink > streaming SQL/Table API tests containing joins or set operations are throwing > errors that are indicating we are loosing some data (full records, without > deserialisation errors). Example errors: > {noformat} > [ERROR] Failures: > [ERROR] JoinITCase.testFullJoinWithEqualPk:775 expected:<List(1,1, 2,2, > 3,3, null,4, null,5)> but was:<List(2,2, 3,3, null,1, null,4, null,5)> > [ERROR] JoinITCase.testStreamJoinWithSameRecord:391 expected:<List(1,1,1,1, > 1,1,1,1, 2,2,2,2, 2,2,2,2, 3,3,3,3, 3,3,3,3, 4,4,4,4, 4,4,4,4, 5,5,5,5, > 5,5,5,5)> but was:<List()> > [ERROR] SemiAntiJoinStreamITCase.testAntiJoin:352 expected:<0> but was:<1> > [ERROR] SetOperatorsITCase.testIntersect:55 expected:<MutableList(1,1,Hi, > 2,2,Hello, 3,2,Hello world)> but was:<List()> > [ERROR] JoinITCase.testJoinPushThroughJoin:1272 expected:<List(1,0,Hi, > 2,1,Hello, 2,1,Hello world)> but was:<List(2,1,Hello, 2,1,Hello world)> > {noformat} > -- This message was sent by Atlassian Jira (v8.3.4#803005)