[ 
https://issues.apache.org/jira/browse/FLINK-17918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121526#comment-17121526
 ] 

Jark Wu commented on FLINK-17918:
---------------------------------

Thanks [~pnowojski] and [~AHeise] for the investigating. After discussing with 
[~yunta], [~yunta] pointed that user should take care of deep copying when 
using heap statebackend, the Javadoc of CopyOnWriteStateMap says:

{quote}
IMPORTANT: the contracts for this class rely on the user not holding any 
references to objects returned by this map
beyond the life cycle of per-element operations. Or phrased differently, all 
get-update-put operations on a mapping
should be within one call of processElement. Otherwise, the user must take care 
of taking deep copies, e.g. for
caching purposes.
{quote}

However, I didn't find the note on the documentation. If this is true, I think 
we should update documentation and update the {{AppendOnlyTopNFunction}} to put 
the copied list into {{dataState}} (and this will bring additional cost for 
RocksDB statebackend). We should also go through all the other operators.

Besides of this, I can also reproduce the failure for 
{{AggregateITCase#testDifferentTypesSumWithRetract}} with [~AHeise]'s commit. 
But I think this maybe another issue, because the simplest case 
"[LocalGlobal=OFF, MiniBatch=OFF, StateBackend=HEAP]" which uses 
{{GroupAggFunction}} will also fail. In the {{GroupAggFunction}}, all the 
{{accumulators}} of 
[{{accState.update(accumulators)}}|https://github.com/apache/flink/blob/4ff59a72b4c8532510cca349840fcbe668de911e/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L179]
 is always a new object.


> Blink Jobs are loosing data on recovery
> ---------------------------------------
>
>                 Key: FLINK-17918
>                 URL: https://issues.apache.org/jira/browse/FLINK-17918
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Table SQL / Runtime
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> After trying to enable unaligned checkpoints by default, a lot of Blink 
> streaming SQL/Table API tests containing joins or set operations are throwing 
> errors that are indicating we are loosing some data (full records, without 
> deserialisation errors). Example errors:
> {noformat}
> [ERROR] Failures: 
> [ERROR]   JoinITCase.testFullJoinWithEqualPk:775 expected:<List(1,1, 2,2, 
> 3,3, null,4, null,5)> but was:<List(2,2, 3,3, null,1, null,4, null,5)>
> [ERROR]   JoinITCase.testStreamJoinWithSameRecord:391 expected:<List(1,1,1,1, 
> 1,1,1,1, 2,2,2,2, 2,2,2,2, 3,3,3,3, 3,3,3,3, 4,4,4,4, 4,4,4,4, 5,5,5,5, 
> 5,5,5,5)> but was:<List()>
> [ERROR]   SemiAntiJoinStreamITCase.testAntiJoin:352 expected:<0> but was:<1>
> [ERROR]   SetOperatorsITCase.testIntersect:55 expected:<MutableList(1,1,Hi, 
> 2,2,Hello, 3,2,Hello world)> but was:<List()>
> [ERROR]   JoinITCase.testJoinPushThroughJoin:1272 expected:<List(1,0,Hi, 
> 2,1,Hello, 2,1,Hello world)> but was:<List(2,1,Hello, 2,1,Hello world)>
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to