[
https://issues.apache.org/jira/browse/FLINK-29217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651958#comment-17651958
]
Yunfeng Zhou commented on FLINK-29217:
--------------------------------------
According to offline discussion with Becket Qin and Dong Lin, given that Flink
Operator Coordinator's support for saving buffered operator events in face of
concurrent checkpoints is not good enough for now, we need to temporarily and
partially disable concurrent manipulation of multiple checkpoints in
OperatorCoordinator.
A short-term solution is as follows.
- If a new checkpoint is triggered on an OperatorCoordinatorHolder when a
checkpoint is still in process, and the new checkpoint cannot be subsumed (i.e.
a savepoint instead of an automatically triggered checkpoint), the checkpoint
would be processed concurrently, and all blocked OperatorEvents would be
regarded as generated after the new checkpoint is triggered (i.e. they would
not be saved to the snapshot of the new checkpoint).
- If a new checkpoint is triggered on an OperatorCoordinatorHolder when a
checkpoint is still in process, and the new checkpoint can be subsumed, the
checkpoint would be temporarily blocked until all ongoing checkpoints have
finished.
A long-term solution could be to make OperatorCoordinators generate checkpoint
barriers and send them to their subtasks. The subtasks would need to align
these barriers with the ones they receive from upstream operators or sources,
and actually trigger the checkpoint when checkpoint barrier alignment is
reached.
> CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
> failed with AssertionFailedError
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-29217
> URL: https://issues.apache.org/jira/browse/FLINK-29217
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.16.0
> Reporter: Xingbo Huang
> Assignee: Yunfeng Zhou
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: 1.16.1
>
>
> {code:java}
> 2022-09-07T02:00:50.2507464Z Sep 07 02:00:50 [ERROR]
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
> Time elapsed: 2.137 s <<< FAILURE!
> 2022-09-07T02:00:50.2508673Z Sep 07 02:00:50
> org.opentest4j.AssertionFailedError:
> 2022-09-07T02:00:50.2509309Z Sep 07 02:00:50
> 2022-09-07T02:00:50.2509945Z Sep 07 02:00:50 Expecting value to be false but
> was true
> 2022-09-07T02:00:50.2511950Z Sep 07 02:00:50 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 2022-09-07T02:00:50.2513254Z Sep 07 02:00:50 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 2022-09-07T02:00:50.2514621Z Sep 07 02:00:50 at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 2022-09-07T02:00:50.2516342Z Sep 07 02:00:50 at
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint(CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:173)
> 2022-09-07T02:00:50.2517852Z Sep 07 02:00:50 at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-09-07T02:00:50.2518888Z Sep 07 02:00:50 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-09-07T02:00:50.2520065Z Sep 07 02:00:50 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-09-07T02:00:50.2521153Z Sep 07 02:00:50 at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-09-07T02:00:50.2522747Z Sep 07 02:00:50 at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-09-07T02:00:50.2523973Z Sep 07 02:00:50 at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-09-07T02:00:50.2525158Z Sep 07 02:00:50 at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-09-07T02:00:50.2526347Z Sep 07 02:00:50 at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-09-07T02:00:50.2527525Z Sep 07 02:00:50 at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-09-07T02:00:50.2528646Z Sep 07 02:00:50 at
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-09-07T02:00:50.2529708Z Sep 07 02:00:50 at
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-09-07T02:00:50.2530744Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-09-07T02:00:50.2532008Z Sep 07 02:00:50 at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-09-07T02:00:50.2533137Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-09-07T02:00:50.2544265Z Sep 07 02:00:50 at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-09-07T02:00:50.2545595Z Sep 07 02:00:50 at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-09-07T02:00:50.2546782Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-09-07T02:00:50.2547810Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-09-07T02:00:50.2548890Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-09-07T02:00:50.2549932Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-09-07T02:00:50.2550933Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-09-07T02:00:50.2552325Z Sep 07 02:00:50 at
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-09-07T02:00:50.2553660Z Sep 07 02:00:50 at
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2022-09-07T02:00:50.2554661Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-09-07T02:00:50.2555590Z Sep 07 02:00:50 at
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 2022-09-07T02:00:50.2556454Z Sep 07 02:00:50 at
> org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 2022-09-07T02:00:50.2557291Z Sep 07 02:00:50 at
> org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> 2022-09-07T02:00:50.2558317Z Sep 07 02:00:50 at
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
> 2022-09-07T02:00:50.2559462Z Sep 07 02:00:50 at
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
> 2022-09-07T02:00:50.2560581Z Sep 07 02:00:50 at
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> 2022-09-07T02:00:50.2562110Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
> 2022-09-07T02:00:50.2563590Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
> 2022-09-07T02:00:50.2564992Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
> 2022-09-07T02:00:50.2566400Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
> 2022-09-07T02:00:50.2567801Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
> 2022-09-07T02:00:50.2569115Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
> 2022-09-07T02:00:50.2570303Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
> 2022-09-07T02:00:50.2572140Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
> 2022-09-07T02:00:50.2573462Z Sep 07 02:00:50 at
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
> 2022-09-07T02:00:50.2574744Z Sep 07 02:00:50 at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
> 2022-09-07T02:00:50.2576081Z Sep 07 02:00:50 at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
> 2022-09-07T02:00:50.2577397Z Sep 07 02:00:50 at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
> 2022-09-07T02:00:50.2578627Z Sep 07 02:00:50 at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
> 2022-09-07T02:00:50.2579773Z Sep 07 02:00:50 at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
> 2022-09-07T02:00:50.2580911Z Sep 07 02:00:50 at
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
> 2022-09-07T02:00:50.2582658Z Sep 07 02:00:50 at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=40763&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8
--
This message was sent by Atlassian Jira
(v8.20.10#820010)