[ 
https://issues.apache.org/jira/browse/FLINK-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16669882#comment-16669882
 ] 

ASF GitHub Bot commented on FLINK-10690:
----------------------------------------

zentol closed pull request #6939: [FLINK-10690][tests] Fix Files.list resource 
leaks
URL: https://github.com/apache/flink/pull/6939
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 23878cbccb8..76b78056440 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -38,6 +38,7 @@
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -227,12 +228,18 @@ private static void assertDirEquals(java.nio.file.Path 
expected, java.nio.file.P
                assertEquals(expected.getFileName(), actual.getFileName());
 
                if (Files.isDirectory(expected)) {
-                       List<java.nio.file.Path> expectedContents = 
Files.list(expected)
-                               
.sorted(Comparator.comparing(java.nio.file.Path::toString))
-                               .collect(Collectors.toList());
-                       List<java.nio.file.Path> actualContents = 
Files.list(actual)
-                               
.sorted(Comparator.comparing(java.nio.file.Path::toString))
-                               .collect(Collectors.toList());
+                       List<java.nio.file.Path> expectedContents;
+                       try (Stream<java.nio.file.Path> files = 
Files.list(expected)) {
+                               expectedContents = files
+                                       
.sorted(Comparator.comparing(java.nio.file.Path::toString))
+                                       .collect(Collectors.toList());
+                       }
+                       List<java.nio.file.Path> actualContents;
+                       try (Stream<java.nio.file.Path> files = 
Files.list(actual)) {
+                               actualContents = files
+                                       
.sorted(Comparator.comparing(java.nio.file.Path::toString))
+                                       .collect(Collectors.toList());
+                       }
 
                        assertEquals(expectedContents.size(), 
actualContents.size());
 
diff --git 
a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 
b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
index 167101cc9aa..388cdc6adfa 100644
--- 
a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
@@ -26,6 +26,7 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
@@ -47,8 +48,10 @@ public static void main(String[] args) throws Exception {
                env.registerCachedFile(inputFile.toString(), "test_data", 
false);
                env.registerCachedFile(inputDir.toString(), "test_dir", false);
 
-               Path containedFile = Files.list(inputDir).findAny()
-                       .orElseThrow(() -> new RuntimeException("Input 
directory must not be empty."));
+               final Path containedFile;
+               try (Stream<Path> files = Files.list(inputDir)) {
+                       containedFile = files.findAny().orElseThrow(() -> new 
RuntimeException("Input directory must not be empty."));
+               }
 
                env.fromElements(1)
                        .map(new TestMapFunction(
@@ -96,8 +99,10 @@ public String map(Integer value) throws Exception {
                                        "initial dir. Input dir path: %s. Cache 
dir path: %s", initialDirPath, testDir));
                        }
 
-                       if 
(Files.list(testDir).map(Path::getFileName).map(Path::toString).noneMatch(path 
-> path.equals(containedFileName))) {
-                               throw new 
RuntimeException(String.format("Cached directory %s should not be empty.", 
testDir));
+                       try (Stream<Path> files = Files.list(testDir)) {
+                               if 
(files.map(Path::getFileName).map(Path::toString).noneMatch(path -> 
path.equals(containedFileName))) {
+                                       throw new 
RuntimeException(String.format("Cached directory %s should not be empty.", 
testDir));
+                               }
                        }
 
                        return Files.readAllLines(testFile)
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
index 1042683c916..2e03197142a 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
@@ -28,8 +28,10 @@
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 /**
  * Tests for the {@link RefCountedFile}.
@@ -49,7 +51,9 @@ public void releaseToZeroRefCounterShouldDeleteTheFile() 
throws IOException {
 
                fileUnderTest.release();
 
-               Assert.assertEquals(0L, 
Files.list(temporaryFolder.getRoot().toPath()).count());
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       Assert.assertEquals(0L, files.count());
+               }
        }
 
        @Test
@@ -76,7 +80,9 @@ public void 
retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOExcept
 
                fileUnderTest.release();
                // the file is deleted now
-               Assert.assertEquals(0L, 
Files.list(temporaryFolder.getRoot().toPath()).count());
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       Assert.assertEquals(0L, files.count());
+               }
        }
 
        @Test
@@ -111,7 +117,9 @@ public void flushAfterCloseShouldThrowException() throws 
IOException {
        // ------------------------------------- Utilities 
-------------------------------------
 
        private void verifyTheFileIsStillThere() throws IOException {
-               Assert.assertEquals(1L, 
Files.list(temporaryFolder.getRoot().toPath()).count());
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       Assert.assertEquals(1L, files.count());
+               }
        }
 
        private RefCountedFile getClosedRefCountedFileWithContent(String 
content) throws IOException {
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
index 0f46f588264..4a1368a815e 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
@@ -164,7 +164,9 @@ public void cleanupAndCheckTmpCleanup() throws Exception {
 
                // delete local tmp dir.
                Assert.assertTrue(Files.exists(localTmpDir));
-               Assert.assertEquals(0L, Files.list(localTmpDir).count());
+               try (Stream<java.nio.file.Path> files = 
Files.list(localTmpDir)) {
+                       Assert.assertEquals(0L, files.count());
+               }
                Files.delete(localTmpDir);
 
                // delete also S3 dir.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index c350393371a..22de8a1dcde 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -63,10 +63,10 @@
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertArrayEquals;
@@ -182,14 +182,17 @@ public void after() {
        }
 
        public void assertUploadDirectoryIsEmpty() throws IOException {
-               Preconditions.checkArgument(
-                       1 == Files.list(configuredUploadDir).count(),
-                       "Directory structure in rest upload directory has 
changed. Test must be adjusted");
-               Optional<Path> actualUploadDir = 
Files.list(configuredUploadDir).findAny();
-               Preconditions.checkArgument(
-                       actualUploadDir.isPresent(),
-                       "Expected upload directory does not exist.");
-               assertEquals("Not all files were cleaned up.", 0, 
Files.list(actualUploadDir.get()).count());
+               Path actualUploadDir;
+               try (Stream<Path> containedFiles = 
Files.list(configuredUploadDir)) {
+                       List<Path> files = 
containedFiles.collect(Collectors.toList());
+                       Preconditions.checkArgument(
+                               1 == files.size(),
+                               "Directory structure in rest upload directory 
has changed. Test must be adjusted");
+                       actualUploadDir = files.get(0);
+               }
+               try (Stream<Path> containedFiles = Files.list(actualUploadDir)) 
{
+                       assertEquals("Not all files were cleaned up.", 0, 
containedFiles.count());
+               }
        }
 
        /**
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
index 95c357dc4b6..5cbde5d49c7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
@@ -53,6 +53,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -122,7 +123,10 @@ public void testStopJobAfterSavepoint() throws Exception {
 
                assertThat(jobStatus, isOneOf(JobStatus.CANCELED, 
JobStatus.CANCELLING));
 
-               final List<Path> savepoints = 
Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+               final List<Path> savepoints;
+               try (Stream<Path> savepointFiles = 
Files.list(savepointDirectory)) {
+                       savepoints = 
savepointFiles.map(Path::getFileName).collect(Collectors.toList());
+               }
                assertThat(savepoints, 
hasItem(Paths.get(savepointLocation).getFileName()));
        }
 
@@ -136,7 +140,10 @@ public void 
testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing() thro
 
                assertThat(jobStatus, isOneOf(JobStatus.CANCELED, 
JobStatus.CANCELLING));
 
-               final List<Path> savepoints = 
Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+               final List<Path> savepoints;
+               try (Stream<Path> savepointFiles = 
Files.list(savepointDirectory)) {
+                       savepoints = 
savepointFiles.map(Path::getFileName).collect(Collectors.toList());
+               }
                assertThat(savepoints, 
hasItem(Paths.get(savepointLocation).getFileName()));
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 0635f239c56..4a08b7c8305 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -54,6 +54,7 @@
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -326,16 +327,18 @@ private static void 
waitUntilExternalizedCheckpointCreated(File checkpointDir, J
        }
 
        private static Optional<Path> findExternalizedCheckpoint(File 
checkpointDir, JobID jobId) throws IOException {
-               return 
Files.list(checkpointDir.toPath().resolve(jobId.toString()))
-                       .filter(path -> 
path.getFileName().toString().startsWith("chk-"))
-                       .filter(path -> {
-                               try {
-                                       return Files.list(path).anyMatch(child 
-> child.getFileName().toString().contains("meta"));
-                               } catch (IOException ignored) {
-                                       return false;
-                               }
-                       })
-                       .findAny();
+               try (Stream<Path> checkpoints = 
Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
+                       return checkpoints
+                               .filter(path -> 
path.getFileName().toString().startsWith("chk-"))
+                               .filter(path -> {
+                                       try (Stream<Path> checkpointFiles = 
Files.list(path)) {
+                                               return 
checkpointFiles.anyMatch(child -> 
child.getFileName().toString().contains("meta"));
+                                       } catch (IOException ignored) {
+                                               return false;
+                                       }
+                               })
+                               .findAny();
+               }
        }
 
        private static void waitUntilCanceled(JobID jobId, ClusterClient<?> 
client) throws ExecutionException, InterruptedException {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 5fc356766f3..b272a2128ff 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -25,6 +25,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -41,12 +42,18 @@
        public void testDeleteApplicationFiles() throws Exception {
                final Path applicationFilesDir = 
temporaryFolder.newFolder(".flink").toPath();
                Files.createFile(applicationFilesDir.resolve("flink.jar"));
-               
assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(1L));
-               assertThat(Files.list(applicationFilesDir).count(), 
equalTo(1L));
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       assertThat(files.count(), equalTo(1L));
+               }
+               try (Stream<Path> files = Files.list(applicationFilesDir)) {
+                       assertThat(files.count(), equalTo(1L));
+               }
 
                Utils.deleteApplicationFiles(Collections.singletonMap(
                        YarnConfigKeys.FLINK_YARN_FILES,
                        applicationFilesDir.toString()));
-               
assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(0L));
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       assertThat(files.count(), equalTo(0L));
+               }
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Tests leak resources via Files.list
> -----------------------------------
>
>                 Key: FLINK-10690
>                 URL: https://issues.apache.org/jira/browse/FLINK-10690
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>    Affects Versions: 1.5.4, 1.6.1, 1.7.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> {{Files.list}} has the unfortunate property that is has to be explicitly 
> closed to cleanup the underlying {{DirectoryStream}}. This is _not_ done 
> automatically by collectors.
> Several tests don't close the stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to