[
https://issues.apache.org/jira/browse/FLINK-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16999249#comment-16999249
]
Andrey Zagrebin commented on FLINK-15247:
-----------------------------------------
It seems the problem is a bit deeper.
We introduced the delayed TaskExecutor#stopTaskExecutorServices in case of
stopping TM in FLINK-11630 to gracefully cancel the tasks and let them release
the resources of services before the final shutdown of services. Previously,
slots were static, there were no resources attached to slots and TaskSlotTable
was stateless, that did not cause issues.
Now TaskSlotTable has dynamically allocated slots after
[FLINK-14189|https://jira.apache.org/jira/browse/FLINK-14189] and memory
manager is attached to the slot after
[FLINK-14400|https://jira.apache.org/jira/browse/FLINK-14400]. It means that
when we cancel now tasks it triggers stopping of the services (including
TaskSlotTable) concurrently with freeing slots and their resources. Even we put
stopping of the services into the main thread, freeing of slots can be after
stopping. It would be cleaner to firstly cancel tasks, wait for the subsequent
freeing of the slots and then shutdown services.
cc [[email protected]]
> Closing (Testing)MiniCluster may cause ConcurrentModificationException
> ----------------------------------------------------------------------
>
> Key: FLINK-15247
> URL: https://issues.apache.org/jira/browse/FLINK-15247
> Project: Flink
> Issue Type: Bug
> Components: Tests
> Affects Versions: 1.10.0
> Reporter: Gary Yao
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.10.0
>
>
> {noformat}
> Test
> operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase)
> failed with:
> org.apache.flink.util.FlinkException: Could not close resource.
> at
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:165)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
> at
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
> at
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
> at
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
> at
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
> at
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
> at
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> Caused by: org.apache.flink.util.FlinkException: Error while shutting the
> TaskExecutor down.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:397)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$0(TaskExecutor.java:382)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$runAfterwardsAsync$16(FutureUtils.java:518)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
> at
> java.util.concurrent.CompletableFuture$UniCompletion.claim(CompletableFuture.java:529)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:751)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:699)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not properly shut down
> the TaskManager services.
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:198)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:419)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$runAfterwardsAsync$16(FutureUtils.java:512)
> ... 16 more
> Caused by: java.util.ConcurrentModificationException
> at java.util.HashMap$Values.forEach(HashMap.java:984)
> at
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable.stop(TaskSlotTable.java:155)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:184)
> ... 18 more
> {noformat}
> https://api.travis-ci.com/v3/job/266467337/log.txt
--
This message was sent by Atlassian Jira
(v8.3.4#803005)