[
https://issues.apache.org/jira/browse/FLINK-22345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327014#comment-17327014
]
Stephan Ewen edited comment on FLINK-22345 at 4/22/21, 12:05 AM:
-----------------------------------------------------------------
With that fix applied, the situation that would previously get the scheduler
caught in the assertion failure loop now causes a strange situation:
The scheduler calls "restoreState()" for recovery from a global failure in a
case where not all tasks had new execution attempts created (here actually none
of the tasks seem to have gotten new execution attempts). So the state is
restored to the CANCELED executions, not the new executions.
{code}
22:36:19,156 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - operator-2
(1/1) (563c234c87f6e32c414755d47a9bca0a) switched from CANCELING to CANCELED.
22:36:19,160 [flink-akka.actor.default-dispatcher-6] INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job 1deff4638a055a66c9a19fffcea6c44f:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=1}]
22:36:19,186 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - operator-1
(1/1) (49a2e08ca4abb6aaf844475a0fbfdb21) switched from CANCELING to CANCELED.
22:36:19,194 [flink-akka.actor.default-dispatcher-6] INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Clearing resource requirements of job 1deff4638a055a66c9a19fffcea6c44f
22:36:20,085 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint
found during restore.
22:36:20,085 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Resetting the
Operator Coordinators to an empty state.
{code}
That trips another assertion in the coordinator code (which could be removed),
but I think it does indeed seem strange that this happens.
I am wondering if there isn't a bug in the scheduler that it gets inconsistent
when dealing with multiple concurrent global failures. Or will it simply
restore the state twice (once to the cancelled attempts, once again to the
recovered attempts)?
In this test run, the problem occurs in multiple profiles:
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=291&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267&t=ab910030-93db-52a7-74a3-34a0addb481b
was (Author: stephanewen):
With that fix applied, the situation that would previously get the scheduler
caught in the assertion failure loop now causes a strange situation:
The scheduler calls "restoreState()" for recovery from a global failure in a
case where not all tasks had new execution attempts created (here actually none
of the tasks seem to have gotten new execution attempts). So the state is
restored to the CANCELED executions, not the new executions.
That trips another assertion in the coordinator code (which could be removed),
but I think it does indeed seem strange that this happens.
I am wondering if there isn't a bug in the scheduler that it gets inconsistent
when dealing with multiple concurrent global failures. Or will it simply
restore the state twice (once to the cancelled attempts, once again to the
recovered attempts)?
In this test run, the problem occurs in multiple profiles:
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=291&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267&t=ab910030-93db-52a7-74a3-34a0addb481b
> CoordinatorEventsExactlyOnceITCase hangs on azure
> -------------------------------------------------
>
> Key: FLINK-22345
> URL: https://issues.apache.org/jira/browse/FLINK-22345
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.13.0
> Reporter: Dawid Wysakowicz
> Assignee: Stephan Ewen
> Priority: Critical
> Labels: test-stability
> Fix For: 1.13.0
>
> Attachments: screenshot-1.png
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16731&view=logs&j=02c4e775-43bf-5625-d1cc-542b5209e072&t=e5961b24-88d9-5c77-efd3-955422674c25&l=9896
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x00007fa8c800b800 nid=0x58b3 waiting on
> condition [0x00007fa8cfd1c000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000008147a7e8> (a
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:802)
> at
> org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.test(CoordinatorEventsExactlyOnceITCase.java:187)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> at
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)