[ https://issues.apache.org/jira/browse/FLINK-17918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120870#comment-17120870 ]
Piotr Nowojski commented on FLINK-17918: ---------------------------------------- Some status update. [~AHeise] has added quite a big of debug messages, focusing on: {{org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase#testDifferentTypesSumWithRetract}} This test is fairly reliably failing locally with unaligned checkpoints enabled by default after 2-15 minutes, when running the following command ( Sometimes you might need to bump fork counts or number of parallel iterations (for loop)): {noformat} for i in {0..7}; do mvn -Dlog4j.configurationFile=/Users/arv/workspace/flink/flink-table/flink-table-planner-blink/src/test/resources/log4j2-test.properties -Dflink.forkCount=1 -Dflink.forkCountTestPackage=1 -Dfast -Dflink.tests.with-openssl -Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -pl flink-table/flink-table-planner-blink integration-test -Dtest=org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase$testDifferentTypesSumWithRetract &>test_$i.txt & done {noformat} After adding a lot of debug messages [~AHeise] pinpointed a problem to the following scenario. Somehow during async part of checkpointing, blink operator (in the case that we are debugging it’s {{AppendOnlyTopNFunction}} implementation of {{LIMIT}} operation) is checkpointing too many records. # Upstream source task is sending: {{[record1, chk1, record2, record3, record4]}}. Downstream {{LIMIT}} is receiving them in this order # only record1 is processed # sync part of checkpoint chk1 completes # AFTER that {{LIMIT}} is processing {{record2, record3, record4}}... # .. but somehow async operation of checkpoint is ending up writing a state as if all of the records were processed. Including {{record2, record3, record4}}, which shouldn’t be part of the checkpoint 1 for {{LIMIT}} operator’s state. # failure happens # {{LIMIT}} operator is restoring with 4 records on the state but ... # source is correctly re-emitting the records {{record2, record3, record4}} # This means LIMIT operator is reaching it’s limit 3 records sooner, and last 3 expected records are lost. The same failure can be induced with *aligned checkpoints* by altering a timing of the operations ([~AHeise]? adding a 100ms sleep after emitting first record and adding 100ms sleep in async part of the checkpointing operation). > Blink Jobs are loosing data on recovery > --------------------------------------- > > Key: FLINK-17918 > URL: https://issues.apache.org/jira/browse/FLINK-17918 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network > Affects Versions: 1.11.0 > Reporter: Piotr Nowojski > Assignee: Arvid Heise > Priority: Blocker > Fix For: 1.11.0 > > > After trying to enable unaligned checkpoints by default, a lot of Blink > streaming SQL/Table API tests containing joins or set operations are throwing > errors that are indicating we are loosing some data (full records, without > deserialisation errors). Example errors: > {noformat} > [ERROR] Failures: > [ERROR] JoinITCase.testFullJoinWithEqualPk:775 expected:<List(1,1, 2,2, > 3,3, null,4, null,5)> but was:<List(2,2, 3,3, null,1, null,4, null,5)> > [ERROR] JoinITCase.testStreamJoinWithSameRecord:391 expected:<List(1,1,1,1, > 1,1,1,1, 2,2,2,2, 2,2,2,2, 3,3,3,3, 3,3,3,3, 4,4,4,4, 4,4,4,4, 5,5,5,5, > 5,5,5,5)> but was:<List()> > [ERROR] SemiAntiJoinStreamITCase.testAntiJoin:352 expected:<0> but was:<1> > [ERROR] SetOperatorsITCase.testIntersect:55 expected:<MutableList(1,1,Hi, > 2,2,Hello, 3,2,Hello world)> but was:<List()> > [ERROR] JoinITCase.testJoinPushThroughJoin:1272 expected:<List(1,0,Hi, > 2,1,Hello, 2,1,Hello world)> but was:<List(2,1,Hello, 2,1,Hello world)> > {noformat} > -- This message was sent by Atlassian Jira (v8.3.4#803005)