[ 
https://issues.apache.org/jira/browse/FLINK-23647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430459#comment-17430459
 ] 

Piotr Nowojski commented on FLINK-23647:
----------------------------------------

Thanks [~roman] and [~arvid] for reporting and analysing this issue.
{quote}
IMO, the desired behavior is to wait for all checkpoints to be discarded before 
reporting job termination to the client.

However, CheckpointCleaner currently submits actions to 
JobManagerSharedServices.scheduledExecutorService; which is terminated with the 
last job (i.e. minicluster shutdown); and it still doesn't wait for all tasks 
termination on shutdown.
A simple workaround would be to wait in CheckpointsCleaner until 
numberOfCheckpointsToClean become 0 on termination. (and wait for executor 
tasks termination).
{quote}
I agree that there are two options to fix this
1. Wait for {{CheckpointCleaner}} to finish before job reaching terminal state
2. Wait for {{CheckpointCleaner}} to finish before shutting down 
{{AdaptiveScheduler}} and {{DefaultScheduler}}

The main advantage of 1. would be that files wouldn't be removed after reaching 
the terminal state, which would be still a possibility in the option 2. However 
I've chatted about this offline with [~trohrmann], and we are not sure if this 
is an issue. After all we are aiming for a clear ownership of 
checkpoint/savepoint files, and checkpoint files would be owned by Flink, so 
users shouldn't be concerned what happens with those files. So for the time 
being we would go with the easier to implement option 2.

> UnalignedCheckpointStressITCase crashed on azure
> ------------------------------------------------
>
>                 Key: FLINK-23647
>                 URL: https://issues.apache.org/jira/browse/FLINK-23647
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network, Tests
>    Affects Versions: 1.14.0
>            Reporter: Roman Khachatryan
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: stale-assigned, test-stability
>             Fix For: 1.15.0, 1.14.1
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21539&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba&l=4855
> When testing DFS changelog implementation in FLINK-23279 and enabling it for 
> all tests,
> UnalignedCheckpointStressITCase crashed with the following exception
> {code}
> [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
> 18.433 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase
> [ERROR] 
> runStressTest(org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase)
>   Time elapsed: 17.663 s  <<< ERROR!
> java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
> /tmp/junit7860347244680665820/435237 d57439f2ceadfedba74dadd6fa/chk-16
>    at 
> java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88)
>    at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104)
>    at java.util.Iterator.forEachRemaining(Iterator.java:115)
>    at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>    at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>    at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>    at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:546)
>    at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.discoverRetainedCheckpoint(UnalignedCheckpointStressITCase.java:288)
>    at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.runAndTakeExternalCheckpoint(UnalignedCheckpointStressITCase.java:261)
>    at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.runStressTest(UnalignedCheckpointStressITCase.java:157)
>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>    at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>    at java.lang.reflect.Method.invoke(Method.java:498)
>    at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>    at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>    at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>    at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>    at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>    at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>    at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>    at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>    at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>    at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>    at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>    at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>    at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>    at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>    at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>    at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java
>  :384)
>    at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>    at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>    at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> Caused by: java.nio.file.NoSuchFileException: 
> /tmp/junit7860347244680665820/435237d57439f2ceadfedba74 dadd6fa/chk-16
>    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>    at 
> sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
>    at 
> sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144)
>    at 
> sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
>    at java.nio.file.Files.readAttributes(Files.java:1737)
>    at java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)
>    at java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
>    at java.nio.file.FileTreeWalker.next(FileTreeWalker.java:372)
>    at 
> java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:84)
> {code}
>  
> The referred checkpoint 16 was aborted and scheduled for deletion.
> But the test does not wait for it to complete and proceeds to file listing.
> I think this problem is also present in UnalignedCheckpointRescaleITCase 
> (FLINK-22197) and probably in CoordinatedSourceRescaleITCase(FLINK-23577).
> Patch to demonstrate it: 
> https://github.com/rkhachatryan/flink/tree/f23647-demo
> Corresponding 
> [failure|https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1039&view=logs&j=0a15d512-44ac-5ba5-97ab-13a5d066c22c&t=9a028d19-6c4b-5a4e-d378-03fca149d0b1&l=4870]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to