[
https://issues.apache.org/jira/browse/FLINK-23233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375870#comment-17375870
]
Yun Gao commented on FLINK-23233:
---------------------------------
It seems the issue is caused by this part:
{code:java}
01:37:29,362 [ Thread-49] INFO
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] -
Invalid request. Received version = 8a0b400e-0aa9-4223-89dc-f7638a67a845,
offset = 0, while expected version = 99a5868b-4923-46ec-ad60-58443bdba519,
offset = 0
01:37:29,764 [ Checkpoint Timer] WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to
trigger checkpoint 5 for job a4214b0065b42350a8831b7ce4a32ad2. (1 consecutive
failed attempts so far)
org.apache.flink.util.FlinkException: Failing OperatorCoordinator checkpoint
because some OperatorEvents before this checkpoint barrier were not received by
the target tasks.
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$completeCheckpointOnceEventsAreDone$4(OperatorCoordinatorHolder.java:344)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:?]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_282]
at
org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:905)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_282]
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$23(FutureUtils.java:1356)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_282]
at
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_282]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_282]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_282]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_282]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_282]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
01:37:29,766 [ Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 6 (type=CHECKPOINT) @ 1625276249765 for job
a4214b0065b42350a8831b7ce4a32ad2.
01:37:29,768 [jobmanager-future-thread-1] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 6 for job a4214b0065b42350a8831b7ce4a32ad2 (942 bytes in 3 ms).
01:37:29,768 [SourceCoordinator-Source: numbers -> Map -> Sink: Data stream
collect sink] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 6 as completed for source Source: numbers -> Map -> Sink: Data
stream collect sink.
01:37:29,769 [ Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 7 (type=CHECKPOINT) @ 1625276249768 for job
a4214b0065b42350a8831b7ce4a32ad2.
01:37:29,770 [jobmanager-future-thread-2] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 7 for job a4214b0065b42350a8831b7ce4a32ad2 (942 bytes in 2 ms).
01:37:29,770 [SourceCoordinator-Source: numbers -> Map -> Sink: Data stream
collect sink] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 7 as completed for source Source: numbers -> Map -> Sink: Data
stream collect sink.
01:37:29,771 [ Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 8 (type=CHECKPOINT) @ 1625276249770 for job
a4214b0065b42350a8831b7ce4a32ad2.
01:37:29,772 [jobmanager-future-thread-1] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 8 for job a4214b0065b42350a8831b7ce4a32ad2 (942 bytes in 2 ms).
01:37:29,773 [flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
numbers -> Map -> Sink: Data stream collect sink (1/1)
(f37c51a50966c297173f7852774e54f2) switched from RUNNING to FAILED on
0057f3d7-309e-4ba2-8b12-99d2269c5aa7 @ localhost (dataPort=-1).
org.apache.flink.util.FlinkException: An OperatorEvent from an
OperatorCoordinator to a task was lost. Triggering task failover to ensure
consistency. Event: 'AddSplitEvents[[[B@6c71f390]]', targetTask: Source:
numbers -> Map -> Sink: Data stream collect sink (1/1) - execution #1
at
org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.lambda$sendEvent$0(SubtaskGatewayImpl.java:81)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:?]
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_282]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
{code}
Currently for the OperatorCoordinator, each event send action is bound with a
future, and two actions are relying on the result of the future:
# When taking checkpoints, it would check if all the previous events are sent
successfully, if not, it would fail the checkpoint. But during this process,
it would cleanup the pending previous events.
# For each event, if the sending failed, it would trigger a failover for the
subtask.
The in this case:
# The job has only one task (source -> map -> sink (1/1)), the source has 3
splits, namely 1 - 34, 35 - 67, 68 - 100.
# With some prior execution, AddSplit[1-34] is emitted and processed. Then now
only two splits remains.
# Then as the above log shows, the next AddSplit[68-100] is emitted before
checkpoint 5, and the test deliberately make the event sending failed due to
timeout.
# The stage related to checkpoint executed first, abort the checkpoint 5. In
this process it removes the pending event, namely now only 1 split remains.
# Now new checkpoints are triggered. Since now there is no failed pending
events, the checkpoint would complete, and only the remaining split 35-67 is
snapshotted.
# Then after some time the stage related to subtask failover executed, trigger
the failover.
# However, after the failover, it would recover from the checkpoint 8, which
contains only the split 35-67. Then AddSplit[35-67] is emitted and executed.
Thus the split 68-100 is missed.
The failure could be reproduced locally by simulates the above case as in
[https://github.com/gaoyunhaii/flink/commits/fix_oc_cp] .
> OperatorEventSendingCheckpointITCase.testOperatorEventLostWithReaderFailure
> fails on azure
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-23233
> URL: https://issues.apache.org/jira/browse/FLINK-23233
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.13.1
> Reporter: Xintong Song
> Assignee: Yun Gao
> Priority: Major
> Fix For: 1.13.2
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19857&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=c2734c79-73b6-521c-e85a-67c7ecae9107&l=9382
> {code}
> Jul 03 01:37:31 [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0,
> Time elapsed: 21.415 s <<< FAILURE! - in
> org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase
> Jul 03 01:37:31 [ERROR]
> testOperatorEventLostWithReaderFailure(org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase)
> Time elapsed: 3.623 s <<< FAILURE!
> Jul 03 01:37:31 java.lang.AssertionError: expected:<[1, 2, 3, 4, 5, 6, 7, 8,
> 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,
> 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46,
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65,
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84,
> 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]> but
> was:<[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
> 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
> 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58,
> 59, 60, 61, 62, 63, 64, 65, 66, 67]>
> Jul 03 01:37:31 at org.junit.Assert.fail(Assert.java:88)
> Jul 03 01:37:31 at org.junit.Assert.failNotEquals(Assert.java:834)
> Jul 03 01:37:31 at org.junit.Assert.assertEquals(Assert.java:118)
> Jul 03 01:37:31 at org.junit.Assert.assertEquals(Assert.java:144)
> Jul 03 01:37:31 at
> org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase.runTest(OperatorEventSendingCheckpointITCase.java:254)
> Jul 03 01:37:31 at
> org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase.testOperatorEventLostWithReaderFailure(OperatorEventSendingCheckpointITCase.java:143)
> Jul 03 01:37:31 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> Jul 03 01:37:31 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 03 01:37:31 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 03 01:37:31 at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 03 01:37:31 at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jul 03 01:37:31 at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 03 01:37:31 at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jul 03 01:37:31 at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 03 01:37:31 at
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 01:37:31 at
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jul 03 01:37:31 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jul 03 01:37:31 at
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jul 03 01:37:31 at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jul 03 01:37:31 at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)