[ https://issues.apache.org/jira/browse/FLINK-17918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121526#comment-17121526 ]
Jark Wu commented on FLINK-17918: --------------------------------- Thanks [~pnowojski] and [~AHeise] for the investigating. After discussing with [~yunta], [~yunta] pointed that user should take care of deep copying when using heap statebackend, the Javadoc of CopyOnWriteStateMap says: {quote} IMPORTANT: the contracts for this class rely on the user not holding any references to objects returned by this map beyond the life cycle of per-element operations. Or phrased differently, all get-update-put operations on a mapping should be within one call of processElement. Otherwise, the user must take care of taking deep copies, e.g. for caching purposes. {quote} However, I didn't find the note on the documentation. If this is true, I think we should update documentation and update the {{AppendOnlyTopNFunction}} to put the copied list into {{dataState}} (and this will bring additional cost for RocksDB statebackend). We should also go through all the other operators. Besides of this, I can also reproduce the failure for {{AggregateITCase#testDifferentTypesSumWithRetract}} with [~AHeise]'s commit. But I think this maybe another issue, because the simplest case "[LocalGlobal=OFF, MiniBatch=OFF, StateBackend=HEAP]" which uses {{GroupAggFunction}} will also fail. In the {{GroupAggFunction}}, all the {{accumulators}} of [{{accState.update(accumulators)}}|https://github.com/apache/flink/blob/4ff59a72b4c8532510cca349840fcbe668de911e/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L179] is always a new object. > Blink Jobs are loosing data on recovery > --------------------------------------- > > Key: FLINK-17918 > URL: https://issues.apache.org/jira/browse/FLINK-17918 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Table SQL / Runtime > Affects Versions: 1.11.0 > Reporter: Piotr Nowojski > Priority: Blocker > Fix For: 1.11.0 > > > After trying to enable unaligned checkpoints by default, a lot of Blink > streaming SQL/Table API tests containing joins or set operations are throwing > errors that are indicating we are loosing some data (full records, without > deserialisation errors). Example errors: > {noformat} > [ERROR] Failures: > [ERROR] JoinITCase.testFullJoinWithEqualPk:775 expected:<List(1,1, 2,2, > 3,3, null,4, null,5)> but was:<List(2,2, 3,3, null,1, null,4, null,5)> > [ERROR] JoinITCase.testStreamJoinWithSameRecord:391 expected:<List(1,1,1,1, > 1,1,1,1, 2,2,2,2, 2,2,2,2, 3,3,3,3, 3,3,3,3, 4,4,4,4, 4,4,4,4, 5,5,5,5, > 5,5,5,5)> but was:<List()> > [ERROR] SemiAntiJoinStreamITCase.testAntiJoin:352 expected:<0> but was:<1> > [ERROR] SetOperatorsITCase.testIntersect:55 expected:<MutableList(1,1,Hi, > 2,2,Hello, 3,2,Hello world)> but was:<List()> > [ERROR] JoinITCase.testJoinPushThroughJoin:1272 expected:<List(1,0,Hi, > 2,1,Hello, 2,1,Hello world)> but was:<List(2,1,Hello, 2,1,Hello world)> > {noformat} > -- This message was sent by Atlassian Jira (v8.3.4#803005)