[ 
https://issues.apache.org/jira/browse/FLINK-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16999249#comment-16999249
 ] 

Andrey Zagrebin commented on FLINK-15247:
-----------------------------------------

It seems the problem is a bit deeper.

We introduced the delayed TaskExecutor#stopTaskExecutorServices in case of 
stopping TM in FLINK-11630 to gracefully cancel the tasks and let them release 
the resources of services before the final shutdown of services. Previously, 
slots were static, there were no resources attached to slots and TaskSlotTable 
was stateless, that did not cause issues.

Now TaskSlotTable has dynamically allocated slots after 
[FLINK-14189|https://jira.apache.org/jira/browse/FLINK-14189] and memory 
manager is attached to the slot after 
[FLINK-14400|https://jira.apache.org/jira/browse/FLINK-14400]. It means that 
when we cancel now tasks it triggers stopping of the services (including 
TaskSlotTable) concurrently with freeing slots and their resources. Even we put 
stopping of the services into the main thread, freeing of slots can be after 
stopping. It would be cleaner to firstly cancel tasks, wait for the subsequent 
freeing of the slots and then shutdown services.

cc [[email protected]]

> Closing (Testing)MiniCluster may cause ConcurrentModificationException
> ----------------------------------------------------------------------
>
>                 Key: FLINK-15247
>                 URL: https://issues.apache.org/jira/browse/FLINK-15247
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>    Affects Versions: 1.10.0
>            Reporter: Gary Yao
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.10.0
>
>
> {noformat}
> Test 
> operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase)
>  failed with:
> org.apache.flink.util.FlinkException: Could not close resource.
>         at 
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:165)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>         at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
>         at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>         at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>         at org.junit.runners.Suite.runChild(Suite.java:128)
>         at org.junit.runners.Suite.runChild(Suite.java:27)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>         at 
> org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
>         at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
>         at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
>         at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
>         at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
>         at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
>         at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>         at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>         at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>         at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> Caused by: org.apache.flink.util.FlinkException: Error while shutting the 
> TaskExecutor down.
>         at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:397)
>         at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$0(TaskExecutor.java:382)
>         at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$runAfterwardsAsync$16(FutureUtils.java:518)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>         at 
> java.util.concurrent.CompletableFuture$UniCompletion.claim(CompletableFuture.java:529)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:751)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:699)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not properly shut down 
> the TaskManager services.
>         at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:198)
>         at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:419)
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$runAfterwardsAsync$16(FutureUtils.java:512)
>         ... 16 more
> Caused by: java.util.ConcurrentModificationException
>         at java.util.HashMap$Values.forEach(HashMap.java:984)
>         at 
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable.stop(TaskSlotTable.java:155)
>         at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:184)
>         ... 18 more
> {noformat}
> https://api.travis-ci.com/v3/job/266467337/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to