[ 
https://issues.apache.org/jira/browse/FLINK-17918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120870#comment-17120870
 ] 

Piotr Nowojski commented on FLINK-17918:
----------------------------------------

Some status update. [~AHeise] has added quite a big of debug messages, focusing 
on:

{{org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase#testDifferentTypesSumWithRetract}}

This test is fairly reliably failing locally with unaligned checkpoints enabled 
by default after 2-15 minutes, when running the following command (
Sometimes you might need to bump fork counts or number of parallel iterations 
(for loop)):
{noformat}
for i in {0..7}; do mvn 
-Dlog4j.configurationFile=/Users/arv/workspace/flink/flink-table/flink-table-planner-blink/src/test/resources/log4j2-test.properties
 -Dflink.forkCount=1 -Dflink.forkCountTestPackage=1 -Dfast 
-Dflink.tests.with-openssl -Dhadoop.version=2.8.3 -Dinclude_hadoop_aws 
-Dscala-2.11 -pl flink-table/flink-table-planner-blink integration-test 
-Dtest=org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase$testDifferentTypesSumWithRetract
 &>test_$i.txt & done
{noformat}
After adding a lot of debug messages [~AHeise] pinpointed a problem to the 
following scenario. Somehow during async part of checkpointing, blink operator 
(in the case that we are debugging it’s {{AppendOnlyTopNFunction}} 
implementation of {{LIMIT}} operation) is checkpointing too many records.

# Upstream source task is sending: {{[record1, chk1, record2, record3, 
record4]}}. Downstream {{LIMIT}} is receiving them in this order
# only record1 is processed 
# sync part of checkpoint chk1 completes
# AFTER that {{LIMIT}} is processing {{record2, record3, record4}}...
# .. but somehow async operation of checkpoint is ending up writing a state as 
if all of the records were processed. Including {{record2, record3, record4}}, 
which shouldn’t be part of the checkpoint 1 for {{LIMIT}} operator’s state.
# failure happens
# {{LIMIT}} operator is restoring with 4 records on the state but ...
# source is correctly re-emitting the records {{record2, record3, record4}}
# This means LIMIT operator is reaching it’s limit 3 records sooner, and last 3 
expected records are lost.

The same failure can be induced with *aligned checkpoints* by altering a timing 
of the operations ([~AHeise]? adding a 100ms sleep after emitting first record 
and adding 100ms sleep in async part of the checkpointing operation).

> Blink Jobs are loosing data on recovery
> ---------------------------------------
>
>                 Key: FLINK-17918
>                 URL: https://issues.apache.org/jira/browse/FLINK-17918
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Network
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Assignee: Arvid Heise
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> After trying to enable unaligned checkpoints by default, a lot of Blink 
> streaming SQL/Table API tests containing joins or set operations are throwing 
> errors that are indicating we are loosing some data (full records, without 
> deserialisation errors). Example errors:
> {noformat}
> [ERROR] Failures: 
> [ERROR]   JoinITCase.testFullJoinWithEqualPk:775 expected:<List(1,1, 2,2, 
> 3,3, null,4, null,5)> but was:<List(2,2, 3,3, null,1, null,4, null,5)>
> [ERROR]   JoinITCase.testStreamJoinWithSameRecord:391 expected:<List(1,1,1,1, 
> 1,1,1,1, 2,2,2,2, 2,2,2,2, 3,3,3,3, 3,3,3,3, 4,4,4,4, 4,4,4,4, 5,5,5,5, 
> 5,5,5,5)> but was:<List()>
> [ERROR]   SemiAntiJoinStreamITCase.testAntiJoin:352 expected:<0> but was:<1>
> [ERROR]   SetOperatorsITCase.testIntersect:55 expected:<MutableList(1,1,Hi, 
> 2,2,Hello, 3,2,Hello world)> but was:<List()>
> [ERROR]   JoinITCase.testJoinPushThroughJoin:1272 expected:<List(1,0,Hi, 
> 2,1,Hello, 2,1,Hello world)> but was:<List(2,1,Hello, 2,1,Hello world)>
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to