[
https://issues.apache.org/jira/browse/FLINK-27148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520331#comment-17520331
]
Yun Gao edited comment on FLINK-27148 at 4/11/22 6:43 AM:
----------------------------------------------------------
With some checks I think this issue is caused by the modification in
https://issues.apache.org/jira/browse/FLINK-26394, [~pltbkd] could you have a
look?
The scenarios of this issue is
# The timer thread starts triggering a checkpoint
# in createPendingCheckpoint, after putting the checkpoint into the
pendingCheckpoints, the timer thread given up the CPU.
# The JM main thread start stopping the CheckpointCoordinator, which further
aborts all the pending checkpoints.
The story was changed from here. Previously it would
# The JM main thread abort the pending checkpoint.
# The timer thread resume. it does not check whether the pending checkpoint is
aborted, thus it continues to triggering the operator coordinators. It would
mark the valve currentCheckpointId = 11.
# After that it starts triggering the tasks. However, a check here found that
the pending checkpoint is aborted, thus it called onTriggerFailure, which
further reset the valve currentCheckpointId to None.
After the change
# The JM main thread abort the pending checkpoint. However, with the newly
introduced masterTriggerCompletePromise to be canceled, it would insert a new
onTriggerFailure task to the timer thread.
# The timer thread executes onTriggerFailure and reset the valve
currentCheckpointId to None.
# The timer thread resume the triggering as before, it continues to triggering
the operator coordinators. It would mark the valve currentCheckpointId = 11.
# However, since now masterTriggerCompletePromise has been completed, the
triggering stop after the source coordinator acknowledged. There won't be
onTriggerFailure get executed after that, thus the valve currentCheckpointId =
11 is kept.
As a whole, currently the trigger is split into multiple steps. Between any two
steps the pending checkpoints might be aborted. We need to check this state
carefully in the start of any step to avoid inconsistency.
Since this PR is only merged onto master, I think the release-1.15 is still ok.
I'll first remove 1.15 from the affected and fixed versions.
was (Author: gaoyunhaii):
With some checks I think this issue is caused by the modification in
https://issues.apache.org/jira/browse/FLINK-26394, [~pltbkd] could you have a
look?
The scenarios of this issue is
1. The timer thread starts triggering a checkpoint
2. in createPendingCheckpoint, after putting the checkpoint into the
pendingCheckpoints, the timer thread given up the CPU.
3. The JM main thread start stopping the CheckpointCoordinator, which further
aborts all the pending checkpoints.
The story was changed from here. Previously it would
1. The JM main thread abort the pending checkpoint.
2. The timer thread resume. it does not check whether the pending checkpoint is
aborted, thus it continues to triggering the operator coordinators. It would
mark the valve currentCheckpointId = 11.
3. After that it starts triggering the tasks. However, a check here found that
the pending checkpoint is aborted, thus it called onTriggerFailure, which
further reset the valve currentCheckpointId to None.
After the change
1. The JM main thread abort the pending checkpoint. However, with the newly
introduced masterTriggerCompletePromise to be canceled, it would insert a new
onTriggerFailure task to the timer thread.
2. The timer thread executes onTriggerFailure and reset the valve
currentCheckpointId to None.
3. The timer thread resume the triggering as before, it continues to triggering
the operator coordinators. It would mark the valve currentCheckpointId = 11.
4. However, since now masterTriggerCompletePromise has been completed, the
triggering stop after the source coordinator acknowledged. There won't be
onTriggerFailure get executed after that, thus the valve currentCheckpointId =
11 is kept.
As a whole, currently the trigger is split into multiple steps. Between any two
steps the pending checkpoints might be aborted. We need to check this state
carefully in the start of any step to avoid inconsistency.
Since this PR is only merged onto master, I think the release-1.15 is still ok.
I'll first remove 1.15 from the affected and fixed versions.
> UnalignedCheckpointITCase fails on AZP
> --------------------------------------
>
> Key: FLINK-27148
> URL: https://issues.apache.org/jira/browse/FLINK-27148
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / Network
> Affects Versions: 1.16.0
> Reporter: Roman Khachatryan
> Priority: Blocker
> Fix For: 1.16.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba&l=5812]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394&view=logs&j=baf26b34-3c6a-54e8-f93f-cf269b32f802&t=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9&l=6018]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34448&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=41655]
> Relevant error message:
>
> {code:java}
> Caused by: java.lang.IllegalStateException: Cannot mark for checkpoint 12,
> already marked for checkpoint 11
> at
> org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
> {code}
>
> {code:java}
> [ERROR] Tests run: 22, Failures: 0, Errors: 1, Skipped: 0, Time elapsed:
> 174.732 s <<< FAILURE! - in
> org.apache.flink.test.checkpointing.UnalignedCheckpointITCase
> [ERROR] UnalignedCheckpointITCase.execute Time elapsed: 6.408 s <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:184)
> at
> org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:287)
> at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> at
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
> at
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
> at
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
> at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
> at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
> at
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
> at
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
> at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
> at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
> at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
> at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
> at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
> at
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
> at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
> Caused by: java.lang.IllegalStateException: Cannot mark for checkpoint 12,
> already marked for checkpoint 11
> at
> org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:302)
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:230)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)