[ 
https://issues.apache.org/jira/browse/FLINK-23233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375870#comment-17375870
 ] 

Yun Gao commented on FLINK-23233:
---------------------------------

It seems the issue is caused by this part:
{code:java}
01:37:29,362 [           Thread-49] INFO  
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - 
Invalid request. Received version = 8a0b400e-0aa9-4223-89dc-f7638a67a845, 
offset = 0, while expected version = 99a5868b-4923-46ec-ad60-58443bdba519, 
offset = 0
01:37:29,764 [    Checkpoint Timer] WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to 
trigger checkpoint 5 for job a4214b0065b42350a8831b7ce4a32ad2. (1 consecutive 
failed attempts so far)
org.apache.flink.util.FlinkException: Failing OperatorCoordinator checkpoint 
because some OperatorEvents before this checkpoint barrier were not received by 
the target tasks.
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$completeCheckpointOnceEventsAreDone$4(OperatorCoordinatorHolder.java:344)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:?]
    at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
 ~[?:1.8.0_282]
    at 
org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:905)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
    at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
 ~[?:1.8.0_282]
    at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$23(FutureUtils.java:1356)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
    at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
 ~[?:1.8.0_282]
    at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
    at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
    at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_282]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_282]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_282]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_282]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_282]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_282]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
01:37:29,766 [    Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 6 (type=CHECKPOINT) @ 1625276249765 for job 
a4214b0065b42350a8831b7ce4a32ad2.
01:37:29,768 [jobmanager-future-thread-1] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 6 for job a4214b0065b42350a8831b7ce4a32ad2 (942 bytes in 3 ms).
01:37:29,768 [SourceCoordinator-Source: numbers -> Map -> Sink: Data stream 
collect sink] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 6 as completed for source Source: numbers -> Map -> Sink: Data 
stream collect sink.
01:37:29,769 [    Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 7 (type=CHECKPOINT) @ 1625276249768 for job 
a4214b0065b42350a8831b7ce4a32ad2.
01:37:29,770 [jobmanager-future-thread-2] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 7 for job a4214b0065b42350a8831b7ce4a32ad2 (942 bytes in 2 ms).
01:37:29,770 [SourceCoordinator-Source: numbers -> Map -> Sink: Data stream 
collect sink] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 7 as completed for source Source: numbers -> Map -> Sink: Data 
stream collect sink.
01:37:29,771 [    Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 8 (type=CHECKPOINT) @ 1625276249770 for job 
a4214b0065b42350a8831b7ce4a32ad2.
01:37:29,772 [jobmanager-future-thread-1] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 8 for job a4214b0065b42350a8831b7ce4a32ad2 (942 bytes in 2 ms). 
01:37:29,773 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
numbers -> Map -> Sink: Data stream collect sink (1/1) 
(f37c51a50966c297173f7852774e54f2) switched from RUNNING to FAILED on 
0057f3d7-309e-4ba2-8b12-99d2269c5aa7 @ localhost (dataPort=-1).
org.apache.flink.util.FlinkException: An OperatorEvent from an 
OperatorCoordinator to a task was lost. Triggering task failover to ensure 
consistency. Event: 'AddSplitEvents[[[B@6c71f390]]', targetTask: Source: 
numbers -> Map -> Sink: Data stream collect sink (1/1) - execution #1
    at 
org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.lambda$sendEvent$0(SubtaskGatewayImpl.java:81)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:?]
    at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
 ~[?:1.8.0_282]
    at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_282]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]

{code}
Currently for the OperatorCoordinator, each event send action is bound with a 
future, and two actions are relying on the result of the future:
 # When taking checkpoints, it would check if all the previous events are sent 
successfully, if not, it would  fail the checkpoint. But during this process, 
it would cleanup the pending previous events.
 # For each event, if the sending failed, it would trigger a failover for the 
subtask.

The in this case:
 # The job has only one task (source -> map -> sink (1/1)), the source has 3 
splits, namely 1 - 34, 35 - 67, 68 - 100.
 # With some prior execution, AddSplit[1-34] is emitted and processed. Then now 
only two splits remains.
 # Then as the above log shows, the next AddSplit[68-100] is emitted before 
checkpoint 5, and the test deliberately make the event sending failed due to 
timeout.
 # The stage related to checkpoint executed first, abort the checkpoint 5. In 
this process it removes the pending event, namely now only 1 split remains.
 # Now new checkpoints are triggered. Since now there is no failed pending 
events, the checkpoint would complete, and only the remaining split 35-67 is 
snapshotted.
 # Then after some time the stage related to subtask failover executed, trigger 
the failover.
 # However, after the failover, it would recover from the checkpoint 8, which 
contains only the split 35-67. Then AddSplit[35-67] is emitted and executed. 
Thus the split 68-100 is missed.

The failure could be reproduced locally by simulates the above case as in 
[https://github.com/gaoyunhaii/flink/commits/fix_oc_cp] .

 

> OperatorEventSendingCheckpointITCase.testOperatorEventLostWithReaderFailure 
> fails on azure
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23233
>                 URL: https://issues.apache.org/jira/browse/FLINK-23233
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.1
>            Reporter: Xintong Song
>            Assignee: Yun Gao
>            Priority: Major
>             Fix For: 1.13.2
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19857&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=c2734c79-73b6-521c-e85a-67c7ecae9107&l=9382
> {code}
> Jul 03 01:37:31 [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 21.415 s <<< FAILURE! - in 
> org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase
> Jul 03 01:37:31 [ERROR] 
> testOperatorEventLostWithReaderFailure(org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase)
>   Time elapsed: 3.623 s  <<< FAILURE!
> Jul 03 01:37:31 java.lang.AssertionError: expected:<[1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 
> 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]> but 
> was:<[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 
> 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 
> 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 
> 59, 60, 61, 62, 63, 64, 65, 66, 67]>
> Jul 03 01:37:31       at org.junit.Assert.fail(Assert.java:88)
> Jul 03 01:37:31       at org.junit.Assert.failNotEquals(Assert.java:834)
> Jul 03 01:37:31       at org.junit.Assert.assertEquals(Assert.java:118)
> Jul 03 01:37:31       at org.junit.Assert.assertEquals(Assert.java:144)
> Jul 03 01:37:31       at 
> org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase.runTest(OperatorEventSendingCheckpointITCase.java:254)
> Jul 03 01:37:31       at 
> org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase.testOperatorEventLostWithReaderFailure(OperatorEventSendingCheckpointITCase.java:143)
> Jul 03 01:37:31       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 03 01:37:31       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 03 01:37:31       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 03 01:37:31       at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 03 01:37:31       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jul 03 01:37:31       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 03 01:37:31       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jul 03 01:37:31       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 03 01:37:31       at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 01:37:31       at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jul 03 01:37:31       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jul 03 01:37:31       at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jul 03 01:37:31       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jul 03 01:37:31       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to