[ 
https://issues.apache.org/jira/browse/FLINK-27148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520331#comment-17520331
 ] 

Yun Gao edited comment on FLINK-27148 at 4/11/22 6:43 AM:
----------------------------------------------------------

With some checks I think this issue is caused by the modification in 
https://issues.apache.org/jira/browse/FLINK-26394, [~pltbkd] could you have a 
look? 

The scenarios of this issue is
# The timer thread starts triggering a checkpoint
# in createPendingCheckpoint, after putting the checkpoint into the 
pendingCheckpoints, the timer thread given up the CPU.
# The JM main thread start stopping the CheckpointCoordinator, which further 
aborts all the pending checkpoints. 

The story was changed from here. Previously it would
# The JM main thread abort the pending checkpoint.
# The timer thread resume. it does not check whether the pending checkpoint is 
aborted, thus it continues to triggering the operator coordinators. It would 
mark the valve currentCheckpointId = 11.
# After that it starts triggering the tasks. However, a check here found that 
the pending checkpoint is aborted, thus it called onTriggerFailure, which 
further reset the valve currentCheckpointId to None.


After the change
# The JM main thread abort the pending checkpoint. However, with the newly 
introduced masterTriggerCompletePromise to be canceled, it would insert a new 
onTriggerFailure task to the timer thread.
# The timer thread executes onTriggerFailure and reset the valve 
currentCheckpointId to None.
# The timer thread resume the triggering as before, it continues to triggering 
the operator coordinators. It would mark the valve currentCheckpointId = 11.
# However, since now masterTriggerCompletePromise has been completed, the 
triggering stop after the source coordinator acknowledged. There won't be 
onTriggerFailure get executed after that, thus the valve currentCheckpointId = 
11 is kept.

As a whole, currently the trigger is split into multiple steps. Between any two 
steps the pending checkpoints might be aborted. We need to check this state 
carefully in the start of any step to avoid inconsistency. 

Since this PR is only merged onto master, I think the release-1.15 is still ok. 
I'll first remove 1.15 from the affected and fixed versions. 


was (Author: gaoyunhaii):
With some checks I think this issue is caused by the modification in 
https://issues.apache.org/jira/browse/FLINK-26394, [~pltbkd] could you have a 
look? 

The scenarios of this issue is
1. The timer thread starts triggering a checkpoint
2. in createPendingCheckpoint, after putting the checkpoint into the 
pendingCheckpoints, the timer thread given up the CPU.
3. The JM main thread start stopping the CheckpointCoordinator, which further 
aborts all the pending checkpoints. 

The story was changed from here. Previously it would
1. The JM main thread abort the pending checkpoint.
2. The timer thread resume. it does not check whether the pending checkpoint is 
aborted, thus it continues to triggering the operator coordinators. It would 
mark the valve currentCheckpointId = 11.
3. After that it starts triggering the tasks. However, a check here found that 
the pending checkpoint is aborted, thus it called onTriggerFailure, which 
further reset the valve currentCheckpointId to None.


After the change
1. The JM main thread abort the pending checkpoint. However, with the newly 
introduced masterTriggerCompletePromise to be canceled, it would insert a new 
onTriggerFailure task to the timer thread.
2. The timer thread executes onTriggerFailure and reset the valve 
currentCheckpointId to None.
3. The timer thread resume the triggering as before, it continues to triggering 
the operator coordinators. It would mark the valve currentCheckpointId = 11.
4. However, since now masterTriggerCompletePromise has been completed, the 
triggering stop after the source coordinator acknowledged. There won't be 
onTriggerFailure get executed after that, thus the valve currentCheckpointId = 
11 is kept.

As a whole, currently the trigger is split into multiple steps. Between any two 
steps the pending checkpoints might be aborted. We need to check this state 
carefully in the start of any step to avoid inconsistency. 

Since this PR is only merged onto master, I think the release-1.15 is still ok. 
I'll first remove 1.15 from the affected and fixed versions. 

> UnalignedCheckpointITCase fails on AZP
> --------------------------------------
>
>                 Key: FLINK-27148
>                 URL: https://issues.apache.org/jira/browse/FLINK-27148
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Network
>    Affects Versions: 1.16.0
>            Reporter: Roman Khachatryan
>            Priority: Blocker
>             Fix For: 1.16.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba&l=5812]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394&view=logs&j=baf26b34-3c6a-54e8-f93f-cf269b32f802&t=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9&l=6018]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34448&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=41655]
> Relevant error message:
>  
> {code:java}
> Caused by: java.lang.IllegalStateException: Cannot mark for checkpoint 12, 
> already marked for checkpoint 11
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
>  {code}
>  
> {code:java}
> [ERROR] Tests run: 22, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
> 174.732 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.UnalignedCheckpointITCase
> [ERROR] UnalignedCheckpointITCase.execute  Time elapsed: 6.408 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>       at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:184)
>       at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:287)
>       at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>       at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>       at org.junit.runners.Suite.runChild(Suite.java:128)
>       at org.junit.runners.Suite.runChild(Suite.java:27)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>       at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>       at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>       at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>       at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
>       at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
>       at 
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
>       at 
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
>       at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
>       at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
>       at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
> Caused by: java.lang.IllegalStateException: Cannot mark for checkpoint 12, 
> already marked for checkpoint 11
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:302)
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:230)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>       at akka.actor.Actor.aroundReceive(Actor.scala:537)
>       at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to